From: Amir Shehata Date: Wed, 28 Nov 2018 01:54:51 +0000 (-0800) Subject: LU-11415 socklnd: improve scheduling algorithm X-Git-Tag: 2.12.51~73 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=89df5e712ffd40064f1d4ce2f00f9156f68a2262 LU-11415 socklnd: improve scheduling algorithm Modified the scheduling algorithm to use all scheduler threads available. Previously a connection is assigned a single thread and can only use that one. With this patch any scheduler thread available on the assigned CPT can pick up and work on requests queued on the connection. Signed-off-by: Amir Shehata Change-Id: I7e8e5220e1914155fd793994485e3a443fe1ba44 Reviewed-on: https://review.whamcloud.com/33740 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Jinshan Xiong Reviewed-by: Olaf Weber Reviewed-by: Patrick Farrell Reviewed-by: Oleg Drokin --- diff --git a/lnet/klnds/socklnd/socklnd.c b/lnet/klnds/socklnd/socklnd.c index dd59e93..ef20ef4 100644 --- a/lnet/klnds/socklnd/socklnd.c +++ b/lnet/klnds/socklnd/socklnd.c @@ -665,33 +665,20 @@ ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index) static struct ksock_sched * ksocknal_choose_scheduler_locked(unsigned int cpt) { - struct ksock_sched_info *info = ksocknal_data.ksnd_sched_info[cpt]; - struct ksock_sched *sched; + struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt]; int i; - if (info->ksi_nthreads == 0) { - cfs_percpt_for_each(info, i, ksocknal_data.ksnd_sched_info) { - if (info->ksi_nthreads > 0) { + if (sched->kss_nthreads == 0) { + cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) { + if (sched->kss_nthreads > 0) { CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n", - cpt, info->ksi_cpt); - goto select_sched; + cpt, sched->kss_cpt); + return sched; } } return NULL; } -select_sched: - sched = &info->ksi_scheds[0]; - /* - * NB: it's safe so far, but info->ksi_nthreads could be changed - * at runtime when we have dynamic LNet configuration, then we - * need to take care of this. - */ - for (i = 1; i < info->ksi_nthreads; i++) { - if (sched->kss_nconns > info->ksi_scheds[i].kss_nconns) - sched = &info->ksi_scheds[i]; - } - return sched; } @@ -1280,7 +1267,7 @@ ksocknal_create_conn(struct lnet_ni *ni, struct ksock_route *route, * The cpt might have changed if we ended up selecting a non cpt * native scheduler. So use the scheduler's cpt instead. */ - cpt = sched->kss_info->ksi_cpt; + cpt = sched->kss_cpt; sched->kss_nconns++; conn->ksnc_scheduler = sched; @@ -1319,11 +1306,10 @@ ksocknal_create_conn(struct lnet_ni *ni, struct ksock_route *route, */ CDEBUG(D_NET, "New conn %s p %d.x %pI4h -> %pI4h/%d" - " incarnation:%lld sched[%d:%d]\n", + " incarnation:%lld sched[%d]\n", libcfs_id2str(peerid), conn->ksnc_proto->pro_version, &conn->ksnc_myipaddr, &conn->ksnc_ipaddr, - conn->ksnc_port, incarnation, cpt, - (int)(sched - &sched->kss_info->ksi_scheds[0])); + conn->ksnc_port, incarnation, cpt); if (active) { /* additional routes after interface exchange? */ @@ -2208,7 +2194,7 @@ ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg) data->ioc_u32[1] = conn->ksnc_port; data->ioc_u32[2] = conn->ksnc_myipaddr; data->ioc_u32[3] = conn->ksnc_type; - data->ioc_u32[4] = conn->ksnc_scheduler->kss_info->ksi_cpt; + data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt; data->ioc_u32[5] = rxmem; data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid; ksocknal_conn_decref(conn); @@ -2247,19 +2233,8 @@ ksocknal_free_buffers (void) { LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0); - if (ksocknal_data.ksnd_sched_info != NULL) { - struct ksock_sched_info *info; - int i; - - cfs_percpt_for_each(info, i, ksocknal_data.ksnd_sched_info) { - if (info->ksi_scheds != NULL) { - LIBCFS_FREE(info->ksi_scheds, - info->ksi_nthreads_max * - sizeof(info->ksi_scheds[0])); - } - } - cfs_percpt_free(ksocknal_data.ksnd_sched_info); - } + if (ksocknal_data.ksnd_schedulers != NULL) + cfs_percpt_free(ksocknal_data.ksnd_schedulers); LIBCFS_FREE (ksocknal_data.ksnd_peers, sizeof(struct list_head) * @@ -2288,10 +2263,8 @@ ksocknal_free_buffers (void) static void ksocknal_base_shutdown(void) { - struct ksock_sched_info *info; struct ksock_sched *sched; int i; - int j; CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n", atomic_read (&libcfs_kmemory)); @@ -2314,23 +2287,14 @@ ksocknal_base_shutdown(void) LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs)); LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes)); - if (ksocknal_data.ksnd_sched_info != NULL) { - cfs_percpt_for_each(info, i, - ksocknal_data.ksnd_sched_info) { - if (info->ksi_scheds == NULL) - continue; - - for (j = 0; j < info->ksi_nthreads_max; j++) { + if (ksocknal_data.ksnd_schedulers != NULL) { + cfs_percpt_for_each(sched, i, + ksocknal_data.ksnd_schedulers) { - sched = &info->ksi_scheds[j]; - LASSERT(list_empty(&sched->\ - kss_tx_conns)); - LASSERT(list_empty(&sched->\ - kss_rx_conns)); - LASSERT(list_empty(&sched-> \ - kss_zombie_noop_txs)); - LASSERT(sched->kss_nconns == 0); - } + LASSERT(list_empty(&sched->kss_tx_conns)); + LASSERT(list_empty(&sched->kss_rx_conns)); + LASSERT(list_empty(&sched->kss_zombie_noop_txs)); + LASSERT(sched->kss_nconns == 0); } } @@ -2339,17 +2303,10 @@ ksocknal_base_shutdown(void) wake_up_all(&ksocknal_data.ksnd_connd_waitq); wake_up_all(&ksocknal_data.ksnd_reaper_waitq); - if (ksocknal_data.ksnd_sched_info != NULL) { - cfs_percpt_for_each(info, i, - ksocknal_data.ksnd_sched_info) { - if (info->ksi_scheds == NULL) - continue; - - for (j = 0; j < info->ksi_nthreads_max; j++) { - sched = &info->ksi_scheds[j]; + if (ksocknal_data.ksnd_schedulers != NULL) { + cfs_percpt_for_each(sched, i, + ksocknal_data.ksnd_schedulers) wake_up_all(&sched->kss_waitq); - } - } } i = 4; @@ -2382,9 +2339,9 @@ ksocknal_base_shutdown(void) static int ksocknal_base_startup(void) { - struct ksock_sched_info *info; - int rc; - int i; + struct ksock_sched *sched; + int rc; + int i; LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING); LASSERT (ksocknal_data.ksnd_nnets == 0); @@ -2424,45 +2381,38 @@ ksocknal_base_startup(void) ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA; try_module_get(THIS_MODULE); - ksocknal_data.ksnd_sched_info = cfs_percpt_alloc(lnet_cpt_table(), - sizeof(*info)); - if (ksocknal_data.ksnd_sched_info == NULL) + /* Create a scheduler block per available CPT */ + ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(), + sizeof(*sched)); + if (ksocknal_data.ksnd_schedulers == NULL) goto failed; - cfs_percpt_for_each(info, i, ksocknal_data.ksnd_sched_info) { - struct ksock_sched *sched; + cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) { int nthrs; + /* + * make sure not to allocate more threads than there are + * cores/CPUs in teh CPT + */ nthrs = cfs_cpt_weight(lnet_cpt_table(), i); if (*ksocknal_tunables.ksnd_nscheds > 0) { nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds); } else { - /* max to half of CPUs, assume another half should be - * reserved for upper layer modules */ + /* + * max to half of CPUs, assume another half should be + * reserved for upper layer modules + */ nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs); } - info->ksi_nthreads_max = nthrs; - info->ksi_cpt = i; - - if (nthrs != 0) { - LIBCFS_CPT_ALLOC(info->ksi_scheds, lnet_cpt_table(), i, - info->ksi_nthreads_max * - sizeof(*sched)); - if (info->ksi_scheds == NULL) - goto failed; - - for (; nthrs > 0; nthrs--) { - sched = &info->ksi_scheds[nthrs - 1]; - - sched->kss_info = info; - spin_lock_init(&sched->kss_lock); - INIT_LIST_HEAD(&sched->kss_rx_conns); - INIT_LIST_HEAD(&sched->kss_tx_conns); - INIT_LIST_HEAD(&sched->kss_zombie_noop_txs); - init_waitqueue_head(&sched->kss_waitq); - } - } + sched->kss_nthreads_max = nthrs; + sched->kss_cpt = i; + + spin_lock_init(&sched->kss_lock); + INIT_LIST_HEAD(&sched->kss_rx_conns); + INIT_LIST_HEAD(&sched->kss_tx_conns); + INIT_LIST_HEAD(&sched->kss_zombie_noop_txs); + init_waitqueue_head(&sched->kss_waitq); } ksocknal_data.ksnd_connd_starting = 0; @@ -2720,37 +2670,35 @@ ksocknal_search_new_ipif(struct ksock_net *net) } static int -ksocknal_start_schedulers(struct ksock_sched_info *info) +ksocknal_start_schedulers(struct ksock_sched *sched) { int nthrs; int rc = 0; int i; - if (info->ksi_nthreads == 0) { + if (sched->kss_nthreads == 0) { if (*ksocknal_tunables.ksnd_nscheds > 0) { - nthrs = info->ksi_nthreads_max; + nthrs = sched->kss_nthreads_max; } else { nthrs = cfs_cpt_weight(lnet_cpt_table(), - info->ksi_cpt); + sched->kss_cpt); nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs); nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs); } - nthrs = min(nthrs, info->ksi_nthreads_max); + nthrs = min(nthrs, sched->kss_nthreads_max); } else { - LASSERT(info->ksi_nthreads <= info->ksi_nthreads_max); + LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max); /* increase two threads if there is new interface */ - nthrs = min(2, info->ksi_nthreads_max - info->ksi_nthreads); + nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads); } for (i = 0; i < nthrs; i++) { long id; char name[20]; - struct ksock_sched *sched; - id = KSOCK_THREAD_ID(info->ksi_cpt, info->ksi_nthreads + i); - sched = &info->ksi_scheds[KSOCK_THREAD_SID(id)]; + id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i); snprintf(name, sizeof(name), "socknal_sd%02d_%02d", - info->ksi_cpt, (int)(sched - &info->ksi_scheds[0])); + sched->kss_cpt, (int)KSOCK_THREAD_SID(id)); rc = ksocknal_thread_start(ksocknal_scheduler, (void *)id, name); @@ -2758,11 +2706,11 @@ ksocknal_start_schedulers(struct ksock_sched_info *info) continue; CERROR("Can't spawn thread %d for scheduler[%d]: %d\n", - info->ksi_cpt, info->ksi_nthreads + i, rc); + sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc); break; } - info->ksi_nthreads += i; + sched->kss_nthreads += i; return rc; } @@ -2777,16 +2725,16 @@ ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts) return -EINVAL; for (i = 0; i < ncpts; i++) { - struct ksock_sched_info *info; + struct ksock_sched *sched; int cpt = (cpts == NULL) ? i : cpts[i]; LASSERT(cpt < cfs_cpt_number(lnet_cpt_table())); - info = ksocknal_data.ksnd_sched_info[cpt]; + sched = ksocknal_data.ksnd_schedulers[cpt]; - if (!newif && info->ksi_nthreads > 0) + if (!newif && sched->kss_nthreads > 0) continue; - rc = ksocknal_start_schedulers(info); + rc = ksocknal_start_schedulers(sched); if (rc != 0) return rc; } diff --git a/lnet/klnds/socklnd/socklnd.h b/lnet/klnds/socklnd/socklnd.h index 82101e4..9f198dd 100644 --- a/lnet/klnds/socklnd/socklnd.h +++ b/lnet/klnds/socklnd/socklnd.h @@ -88,32 +88,25 @@ # define SOCKNAL_RISK_KMAP_DEADLOCK 1 #endif -struct ksock_sched_info; - -struct ksock_sched { /* per scheduler state */ - spinlock_t kss_lock; /* serialise */ - struct list_head kss_rx_conns; /* conn waiting to be read */ +/* per scheduler state */ +struct ksock_sched { + /* serialise */ + spinlock_t kss_lock; /* conn waiting to be written */ - struct list_head kss_tx_conns; + struct list_head kss_rx_conns; + struct list_head kss_tx_conns; /* zombie noop tx list */ - struct list_head kss_zombie_noop_txs; - wait_queue_head_t kss_waitq; /* where scheduler sleeps */ + struct list_head kss_zombie_noop_txs; + /* where scheduler sleeps */ + wait_queue_head_t kss_waitq; /* # connections assigned to this scheduler */ - int kss_nconns; - struct ksock_sched_info *kss_info; /* owner of it */ -#if !SOCKNAL_SINGLE_FRAG_RX - struct page *kss_rx_scratch_pgs[LNET_MAX_IOV]; -#endif -#if !SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_SINGLE_FRAG_RX - struct kvec kss_scratch_iov[LNET_MAX_IOV]; -#endif -}; - -struct ksock_sched_info { - int ksi_nthreads_max; /* max allowed threads */ - int ksi_nthreads; /* number of threads */ - int ksi_cpt; /* CPT id */ - struct ksock_sched *ksi_scheds; /* array of schedulers */ + int kss_nconns; + /* max allowed threads */ + int kss_nthreads_max; + /* number of threads */ + int kss_nthreads; + /* CPT id */ + int kss_cpt; }; #define KSOCK_CPT_SHIFT 16 @@ -199,7 +192,7 @@ struct ksock_nal_data { int ksnd_nthreads; /* # live threads */ int ksnd_shuttingdown; /* tell threads to exit */ /* schedulers information */ - struct ksock_sched_info **ksnd_sched_info; + struct ksock_sched **ksnd_schedulers; atomic_t ksnd_nactive_txs; /* #active txs */ @@ -662,11 +655,15 @@ extern void ksocknal_lib_reset_callback(struct socket *sock, extern void ksocknal_lib_push_conn(struct ksock_conn *conn); extern int ksocknal_lib_get_conn_addrs(struct ksock_conn *conn); extern int ksocknal_lib_setup_sock(struct socket *so); -extern int ksocknal_lib_send_iov(struct ksock_conn *conn, struct ksock_tx *tx); -extern int ksocknal_lib_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx); +extern int ksocknal_lib_send_iov(struct ksock_conn *conn, struct ksock_tx *tx, + struct kvec *scratch_iov); +extern int ksocknal_lib_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx, + struct kvec *scratch_iov); extern void ksocknal_lib_eager_ack(struct ksock_conn *conn); -extern int ksocknal_lib_recv_iov(struct ksock_conn *conn); -extern int ksocknal_lib_recv_kiov(struct ksock_conn *conn); +extern int ksocknal_lib_recv_iov(struct ksock_conn *conn, + struct kvec *scratchiov); +extern int ksocknal_lib_recv_kiov(struct ksock_conn *conn, struct page **pages, + struct kvec *scratchiov); extern int ksocknal_lib_get_conn_tunables(struct ksock_conn *conn, int *txmem, int *rxmem, int *nagle); diff --git a/lnet/klnds/socklnd/socklnd_cb.c b/lnet/klnds/socklnd/socklnd_cb.c index 056e080..962f6c6 100644 --- a/lnet/klnds/socklnd/socklnd_cb.c +++ b/lnet/klnds/socklnd/socklnd_cb.c @@ -111,82 +111,85 @@ ksocknal_free_tx(struct ksock_tx *tx) } static int -ksocknal_send_iov(struct ksock_conn *conn, struct ksock_tx *tx) +ksocknal_send_iov(struct ksock_conn *conn, struct ksock_tx *tx, + struct kvec *scratch_iov) { struct kvec *iov = tx->tx_iov; - int nob; - int rc; + int nob; + int rc; - LASSERT (tx->tx_niov > 0); + LASSERT(tx->tx_niov > 0); - /* Never touch tx->tx_iov inside ksocknal_lib_send_iov() */ - rc = ksocknal_lib_send_iov(conn, tx); + /* Never touch tx->tx_iov inside ksocknal_lib_send_iov() */ + rc = ksocknal_lib_send_iov(conn, tx, scratch_iov); - if (rc <= 0) /* sent nothing? */ - return (rc); + if (rc <= 0) /* sent nothing? */ + return rc; - nob = rc; - LASSERT (nob <= tx->tx_resid); - tx->tx_resid -= nob; + nob = rc; + LASSERT(nob <= tx->tx_resid); + tx->tx_resid -= nob; - /* "consume" iov */ - do { - LASSERT (tx->tx_niov > 0); + /* "consume" iov */ + do { + LASSERT(tx->tx_niov > 0); - if (nob < (int) iov->iov_len) { + if (nob < (int) iov->iov_len) { iov->iov_base += nob; - iov->iov_len -= nob; - return (rc); - } + iov->iov_len -= nob; + return rc; + } - nob -= iov->iov_len; - tx->tx_iov = ++iov; - tx->tx_niov--; - } while (nob != 0); + nob -= iov->iov_len; + tx->tx_iov = ++iov; + tx->tx_niov--; + } while (nob != 0); - return (rc); + return rc; } static int -ksocknal_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx) +ksocknal_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx, + struct kvec *scratch_iov) { lnet_kiov_t *kiov = tx->tx_kiov; int nob; int rc; - LASSERT (tx->tx_niov == 0); - LASSERT (tx->tx_nkiov > 0); + LASSERT(tx->tx_niov == 0); + LASSERT(tx->tx_nkiov > 0); - /* Never touch tx->tx_kiov inside ksocknal_lib_send_kiov() */ - rc = ksocknal_lib_send_kiov(conn, tx); + /* Never touch tx->tx_kiov inside ksocknal_lib_send_kiov() */ + rc = ksocknal_lib_send_kiov(conn, tx, scratch_iov); - if (rc <= 0) /* sent nothing? */ - return (rc); + if (rc <= 0) /* sent nothing? */ + return rc; - nob = rc; - LASSERT (nob <= tx->tx_resid); - tx->tx_resid -= nob; + nob = rc; + LASSERT(nob <= tx->tx_resid); + tx->tx_resid -= nob; - /* "consume" kiov */ - do { - LASSERT(tx->tx_nkiov > 0); + /* "consume" kiov */ + do { + LASSERT(tx->tx_nkiov > 0); - if (nob < (int)kiov->kiov_len) { - kiov->kiov_offset += nob; - kiov->kiov_len -= nob; - return rc; - } + if (nob < (int)kiov->kiov_len) { + kiov->kiov_offset += nob; + kiov->kiov_len -= nob; + return rc; + } - nob -= (int)kiov->kiov_len; - tx->tx_kiov = ++kiov; - tx->tx_nkiov--; - } while (nob != 0); + nob -= (int)kiov->kiov_len; + tx->tx_kiov = ++kiov; + tx->tx_nkiov--; + } while (nob != 0); - return (rc); + return rc; } static int -ksocknal_transmit(struct ksock_conn *conn, struct ksock_tx *tx) +ksocknal_transmit(struct ksock_conn *conn, struct ksock_tx *tx, + struct kvec *scratch_iov) { int rc; int bufnob; @@ -198,26 +201,26 @@ ksocknal_transmit(struct ksock_conn *conn, struct ksock_tx *tx) LASSERT(tx->tx_resid != 0); - rc = ksocknal_connsock_addref(conn); - if (rc != 0) { - LASSERT (conn->ksnc_closing); - return (-ESHUTDOWN); - } + rc = ksocknal_connsock_addref(conn); + if (rc != 0) { + LASSERT(conn->ksnc_closing); + return -ESHUTDOWN; + } - do { - if (ksocknal_data.ksnd_enomem_tx > 0) { - /* testing... */ - ksocknal_data.ksnd_enomem_tx--; - rc = -EAGAIN; - } else if (tx->tx_niov != 0) { - rc = ksocknal_send_iov (conn, tx); - } else { - rc = ksocknal_send_kiov (conn, tx); - } + do { + if (ksocknal_data.ksnd_enomem_tx > 0) { + /* testing... */ + ksocknal_data.ksnd_enomem_tx--; + rc = -EAGAIN; + } else if (tx->tx_niov != 0) { + rc = ksocknal_send_iov(conn, tx, scratch_iov); + } else { + rc = ksocknal_send_kiov(conn, tx, scratch_iov); + } bufnob = conn->ksnc_sock->sk->sk_wmem_queued; - if (rc > 0) /* sent something? */ - conn->ksnc_tx_bufnob += rc; /* account it */ + if (rc > 0) /* sent something? */ + conn->ksnc_tx_bufnob += rc; /* account it */ if (bufnob < conn->ksnc_tx_bufnob) { /* allocated send buffer bytes < computed; infer @@ -230,45 +233,45 @@ ksocknal_transmit(struct ksock_conn *conn, struct ksock_tx *tx) } if (rc <= 0) { /* Didn't write anything? */ + /* some stacks return 0 instead of -EAGAIN */ + if (rc == 0) + rc = -EAGAIN; - if (rc == 0) /* some stacks return 0 instead of -EAGAIN */ - rc = -EAGAIN; - - /* Check if EAGAIN is due to memory pressure */ - if(rc == -EAGAIN && ksocknal_lib_memory_pressure(conn)) - rc = -ENOMEM; + /* Check if EAGAIN is due to memory pressure */ + if (rc == -EAGAIN && ksocknal_lib_memory_pressure(conn)) + rc = -ENOMEM; - break; - } + break; + } - /* socket's wmem_queued now includes 'rc' bytes */ + /* socket's wmem_queued now includes 'rc' bytes */ atomic_sub (rc, &conn->ksnc_tx_nob); - rc = 0; + rc = 0; - } while (tx->tx_resid != 0); + } while (tx->tx_resid != 0); - ksocknal_connsock_decref(conn); - return (rc); + ksocknal_connsock_decref(conn); + return rc; } static int -ksocknal_recv_iov(struct ksock_conn *conn) +ksocknal_recv_iov(struct ksock_conn *conn, struct kvec *scratchiov) { struct kvec *iov = conn->ksnc_rx_iov; - int nob; - int rc; + int nob; + int rc; - LASSERT (conn->ksnc_rx_niov > 0); + LASSERT(conn->ksnc_rx_niov > 0); /* Never touch conn->ksnc_rx_iov or change connection - * status inside ksocknal_lib_recv_iov */ - rc = ksocknal_lib_recv_iov(conn); + * status inside ksocknal_lib_recv_iov */ + rc = ksocknal_lib_recv_iov(conn, scratchiov); - if (rc <= 0) - return (rc); + if (rc <= 0) + return rc; - /* received something... */ - nob = rc; + /* received something... */ + nob = rc; conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds(); conn->ksnc_rx_deadline = ktime_get_seconds() + @@ -279,40 +282,41 @@ ksocknal_recv_iov(struct ksock_conn *conn) conn->ksnc_rx_nob_wanted -= nob; conn->ksnc_rx_nob_left -= nob; - do { - LASSERT (conn->ksnc_rx_niov > 0); + do { + LASSERT(conn->ksnc_rx_niov > 0); - if (nob < (int)iov->iov_len) { - iov->iov_len -= nob; + if (nob < (int)iov->iov_len) { + iov->iov_len -= nob; iov->iov_base += nob; - return (-EAGAIN); - } + return -EAGAIN; + } - nob -= iov->iov_len; - conn->ksnc_rx_iov = ++iov; - conn->ksnc_rx_niov--; - } while (nob != 0); + nob -= iov->iov_len; + conn->ksnc_rx_iov = ++iov; + conn->ksnc_rx_niov--; + } while (nob != 0); - return (rc); + return rc; } static int -ksocknal_recv_kiov(struct ksock_conn *conn) +ksocknal_recv_kiov(struct ksock_conn *conn, struct page **rx_scratch_pgs, + struct kvec *scratch_iov) { lnet_kiov_t *kiov = conn->ksnc_rx_kiov; int nob; int rc; - LASSERT (conn->ksnc_rx_nkiov > 0); + LASSERT(conn->ksnc_rx_nkiov > 0); /* Never touch conn->ksnc_rx_kiov or change connection - * status inside ksocknal_lib_recv_iov */ - rc = ksocknal_lib_recv_kiov(conn); + * status inside ksocknal_lib_recv_iov */ + rc = ksocknal_lib_recv_kiov(conn, rx_scratch_pgs, scratch_iov); - if (rc <= 0) - return (rc); + if (rc <= 0) + return rc; - /* received something... */ - nob = rc; + /* received something... */ + nob = rc; conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds(); conn->ksnc_rx_deadline = ktime_get_seconds() + @@ -323,70 +327,72 @@ ksocknal_recv_kiov(struct ksock_conn *conn) conn->ksnc_rx_nob_wanted -= nob; conn->ksnc_rx_nob_left -= nob; - do { - LASSERT (conn->ksnc_rx_nkiov > 0); + do { + LASSERT(conn->ksnc_rx_nkiov > 0); - if (nob < (int) kiov->kiov_len) { - kiov->kiov_offset += nob; - kiov->kiov_len -= nob; - return -EAGAIN; - } + if (nob < (int) kiov->kiov_len) { + kiov->kiov_offset += nob; + kiov->kiov_len -= nob; + return -EAGAIN; + } - nob -= kiov->kiov_len; - conn->ksnc_rx_kiov = ++kiov; - conn->ksnc_rx_nkiov--; - } while (nob != 0); + nob -= kiov->kiov_len; + conn->ksnc_rx_kiov = ++kiov; + conn->ksnc_rx_nkiov--; + } while (nob != 0); - return 1; + return 1; } static int -ksocknal_receive(struct ksock_conn *conn) +ksocknal_receive(struct ksock_conn *conn, struct page **rx_scratch_pgs, + struct kvec *scratch_iov) { - /* Return 1 on success, 0 on EOF, < 0 on error. - * Caller checks ksnc_rx_nob_wanted to determine - * progress/completion. */ - int rc; - ENTRY; + /* Return 1 on success, 0 on EOF, < 0 on error. + * Caller checks ksnc_rx_nob_wanted to determine + * progress/completion. */ + int rc; + ENTRY; if (ksocknal_data.ksnd_stall_rx != 0) { set_current_state(TASK_UNINTERRUPTIBLE); schedule_timeout(cfs_time_seconds(ksocknal_data.ksnd_stall_rx)); } - rc = ksocknal_connsock_addref(conn); - if (rc != 0) { - LASSERT (conn->ksnc_closing); - return (-ESHUTDOWN); - } + rc = ksocknal_connsock_addref(conn); + if (rc != 0) { + LASSERT(conn->ksnc_closing); + return -ESHUTDOWN; + } - for (;;) { - if (conn->ksnc_rx_niov != 0) - rc = ksocknal_recv_iov (conn); - else - rc = ksocknal_recv_kiov (conn); - - if (rc <= 0) { - /* error/EOF or partial receive */ - if (rc == -EAGAIN) { - rc = 1; - } else if (rc == 0 && conn->ksnc_rx_started) { - /* EOF in the middle of a message */ - rc = -EPROTO; - } - break; - } + for (;;) { + if (conn->ksnc_rx_niov != 0) + rc = ksocknal_recv_iov(conn, scratch_iov); + else + rc = ksocknal_recv_kiov(conn, rx_scratch_pgs, + scratch_iov); - /* Completed a fragment */ + if (rc <= 0) { + /* error/EOF or partial receive */ + if (rc == -EAGAIN) { + rc = 1; + } else if (rc == 0 && conn->ksnc_rx_started) { + /* EOF in the middle of a message */ + rc = -EPROTO; + } + break; + } - if (conn->ksnc_rx_nob_wanted == 0) { - rc = 1; - break; - } - } + /* Completed a fragment */ + + if (conn->ksnc_rx_nob_wanted == 0) { + rc = 1; + break; + } + } - ksocknal_connsock_decref(conn); - RETURN (rc); + ksocknal_connsock_decref(conn); + RETURN(rc); } void @@ -530,7 +536,8 @@ ksocknal_uncheck_zc_req(struct ksock_tx *tx) } static int -ksocknal_process_transmit(struct ksock_conn *conn, struct ksock_tx *tx) +ksocknal_process_transmit(struct ksock_conn *conn, struct ksock_tx *tx, + struct kvec *scratch_iov) { int rc; bool error_sim = false; @@ -541,38 +548,38 @@ ksocknal_process_transmit(struct ksock_conn *conn, struct ksock_tx *tx) goto simulate_error; } - if (tx->tx_zc_capable && !tx->tx_zc_checked) - ksocknal_check_zc_req(tx); + if (tx->tx_zc_capable && !tx->tx_zc_checked) + ksocknal_check_zc_req(tx); - rc = ksocknal_transmit (conn, tx); + rc = ksocknal_transmit(conn, tx, scratch_iov); - CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc); + CDEBUG(D_NET, "send(%d) %d\n", tx->tx_resid, rc); - if (tx->tx_resid == 0) { - /* Sent everything OK */ - LASSERT (rc == 0); + if (tx->tx_resid == 0) { + /* Sent everything OK */ + LASSERT(rc == 0); - return (0); - } + return 0; + } - if (rc == -EAGAIN) - return (rc); + if (rc == -EAGAIN) + return rc; - if (rc == -ENOMEM) { - static int counter; + if (rc == -ENOMEM) { + static int counter; - counter++; /* exponential backoff warnings */ - if ((counter & (-counter)) == counter) - CWARN("%u ENOMEM tx %p (%u allocated)\n", + counter++; /* exponential backoff warnings */ + if ((counter & (-counter)) == counter) + CWARN("%u ENOMEM tx %p (%u allocated)\n", counter, conn, atomic_read(&libcfs_kmemory)); - /* Queue on ksnd_enomem_conns for retry after a timeout */ + /* Queue on ksnd_enomem_conns for retry after a timeout */ spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); - /* enomem list takes over scheduler's ref... */ - LASSERT (conn->ksnc_tx_scheduled); + /* enomem list takes over scheduler's ref... */ + LASSERT(conn->ksnc_tx_scheduled); list_add_tail(&conn->ksnc_tx_list, - &ksocknal_data.ksnd_enomem_conns); + &ksocknal_data.ksnd_enomem_conns); if (ktime_get_seconds() + SOCKNAL_ENOMEM_RETRY < ksocknal_data.ksnd_reaper_waketime) wake_up(&ksocknal_data.ksnd_reaper_waitq); @@ -1164,7 +1171,9 @@ ksocknal_new_packet(struct ksock_conn *conn, int nob_to_skip) } static int -ksocknal_process_receive(struct ksock_conn *conn) +ksocknal_process_receive(struct ksock_conn *conn, + struct page **rx_scratch_pgs, + struct kvec *scratch_iov) { struct lnet_hdr *lhdr; struct lnet_process_id *id; @@ -1174,13 +1183,14 @@ ksocknal_process_receive(struct ksock_conn *conn) /* NB: sched lock NOT held */ /* SOCKNAL_RX_LNET_HEADER is here for backward compatibility */ - LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER || - conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD || - conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER || - conn->ksnc_rx_state == SOCKNAL_RX_SLOP); + LASSERT(conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER || + conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD || + conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER || + conn->ksnc_rx_state == SOCKNAL_RX_SLOP); again: - if (conn->ksnc_rx_nob_wanted != 0) { - rc = ksocknal_receive(conn); + if (conn->ksnc_rx_nob_wanted != 0) { + rc = ksocknal_receive(conn, rx_scratch_pgs, + scratch_iov); if (rc <= 0) { struct lnet_process_id ksnp_id; @@ -1449,154 +1459,169 @@ ksocknal_sched_cansleep(struct ksock_sched *sched) int ksocknal_scheduler(void *arg) { - struct ksock_sched_info *info; struct ksock_sched *sched; struct ksock_conn *conn; struct ksock_tx *tx; int rc; int nloops = 0; long id = (long)arg; + struct page **rx_scratch_pgs; + struct kvec *scratch_iov; - info = ksocknal_data.ksnd_sched_info[KSOCK_THREAD_CPT(id)]; - sched = &info->ksi_scheds[KSOCK_THREAD_SID(id)]; + sched = ksocknal_data.ksnd_schedulers[KSOCK_THREAD_CPT(id)]; + + LIBCFS_CPT_ALLOC(rx_scratch_pgs, lnet_cpt_table(), sched->kss_cpt, + sizeof(*rx_scratch_pgs) * LNET_MAX_IOV); + if (!rx_scratch_pgs) { + CERROR("Unable to allocate scratch pages\n"); + return -ENOMEM; + } + + LIBCFS_CPT_ALLOC(scratch_iov, lnet_cpt_table(), sched->kss_cpt, + sizeof(*scratch_iov) * LNET_MAX_IOV); + if (!scratch_iov) { + CERROR("Unable to allocate scratch iov\n"); + return -ENOMEM; + } cfs_block_allsigs(); - rc = cfs_cpt_bind(lnet_cpt_table(), info->ksi_cpt); + rc = cfs_cpt_bind(lnet_cpt_table(), sched->kss_cpt); if (rc != 0) { CWARN("Can't set CPU partition affinity to %d: %d\n", - info->ksi_cpt, rc); + sched->kss_cpt, rc); } spin_lock_bh(&sched->kss_lock); - while (!ksocknal_data.ksnd_shuttingdown) { - int did_something = 0; + while (!ksocknal_data.ksnd_shuttingdown) { + int did_something = 0; - /* Ensure I progress everything semi-fairly */ + /* Ensure I progress everything semi-fairly */ if (!list_empty(&sched->kss_rx_conns)) { conn = list_entry(sched->kss_rx_conns.next, struct ksock_conn, ksnc_rx_list); list_del(&conn->ksnc_rx_list); - LASSERT(conn->ksnc_rx_scheduled); - LASSERT(conn->ksnc_rx_ready); + LASSERT(conn->ksnc_rx_scheduled); + LASSERT(conn->ksnc_rx_ready); - /* clear rx_ready in case receive isn't complete. - * Do it BEFORE we call process_recv, since - * data_ready can set it any time after we release - * kss_lock. */ - conn->ksnc_rx_ready = 0; + /* clear rx_ready in case receive isn't complete. + * Do it BEFORE we call process_recv, since + * data_ready can set it any time after we release + * kss_lock. */ + conn->ksnc_rx_ready = 0; spin_unlock_bh(&sched->kss_lock); - rc = ksocknal_process_receive(conn); + rc = ksocknal_process_receive(conn, rx_scratch_pgs, + scratch_iov); spin_lock_bh(&sched->kss_lock); - /* I'm the only one that can clear this flag */ - LASSERT(conn->ksnc_rx_scheduled); + /* I'm the only one that can clear this flag */ + LASSERT(conn->ksnc_rx_scheduled); - /* Did process_receive get everything it wanted? */ - if (rc == 0) - conn->ksnc_rx_ready = 1; - - if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) { - /* Conn blocked waiting for ksocknal_recv() - * I change its state (under lock) to signal - * it can be rescheduled */ - conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT; - } else if (conn->ksnc_rx_ready) { - /* reschedule for rx */ + /* Did process_receive get everything it wanted? */ + if (rc == 0) + conn->ksnc_rx_ready = 1; + + if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) { + /* Conn blocked waiting for ksocknal_recv() + * I change its state (under lock) to signal + * it can be rescheduled */ + conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT; + } else if (conn->ksnc_rx_ready) { + /* reschedule for rx */ list_add_tail(&conn->ksnc_rx_list, - &sched->kss_rx_conns); - } else { - conn->ksnc_rx_scheduled = 0; - /* drop my ref */ - ksocknal_conn_decref(conn); - } + &sched->kss_rx_conns); + } else { + conn->ksnc_rx_scheduled = 0; + /* drop my ref */ + ksocknal_conn_decref(conn); + } - did_something = 1; - } + did_something = 1; + } if (!list_empty(&sched->kss_tx_conns)) { struct list_head zlist = LIST_HEAD_INIT(zlist); if (!list_empty(&sched->kss_zombie_noop_txs)) { list_add(&zlist, - &sched->kss_zombie_noop_txs); + &sched->kss_zombie_noop_txs); list_del_init(&sched->kss_zombie_noop_txs); - } + } conn = list_entry(sched->kss_tx_conns.next, struct ksock_conn, ksnc_tx_list); list_del(&conn->ksnc_tx_list); - LASSERT(conn->ksnc_tx_scheduled); - LASSERT(conn->ksnc_tx_ready); + LASSERT(conn->ksnc_tx_scheduled); + LASSERT(conn->ksnc_tx_ready); LASSERT(!list_empty(&conn->ksnc_tx_queue)); tx = list_entry(conn->ksnc_tx_queue.next, struct ksock_tx, tx_list); - if (conn->ksnc_tx_carrier == tx) - ksocknal_next_tx_carrier(conn); + if (conn->ksnc_tx_carrier == tx) + ksocknal_next_tx_carrier(conn); - /* dequeue now so empty list => more to send */ + /* dequeue now so empty list => more to send */ list_del(&tx->tx_list); - /* Clear tx_ready in case send isn't complete. Do - * it BEFORE we call process_transmit, since - * write_space can set it any time after we release - * kss_lock. */ - conn->ksnc_tx_ready = 0; + /* Clear tx_ready in case send isn't complete. Do + * it BEFORE we call process_transmit, since + * write_space can set it any time after we release + * kss_lock. */ + conn->ksnc_tx_ready = 0; spin_unlock_bh(&sched->kss_lock); if (!list_empty(&zlist)) { /* free zombie noop txs, it's fast because - * noop txs are just put in freelist */ - ksocknal_txlist_done(NULL, &zlist, 0); - } + * noop txs are just put in freelist */ + ksocknal_txlist_done(NULL, &zlist, 0); + } - rc = ksocknal_process_transmit(conn, tx); + rc = ksocknal_process_transmit(conn, tx, scratch_iov); - if (rc == -ENOMEM || rc == -EAGAIN) { - /* Incomplete send: replace tx on HEAD of tx_queue */ + if (rc == -ENOMEM || rc == -EAGAIN) { + /* Incomplete send: replace tx on HEAD of tx_queue */ spin_lock_bh(&sched->kss_lock); list_add(&tx->tx_list, - &conn->ksnc_tx_queue); + &conn->ksnc_tx_queue); } else { /* Complete send; tx -ref */ ksocknal_tx_decref(tx); spin_lock_bh(&sched->kss_lock); - /* assume space for more */ - conn->ksnc_tx_ready = 1; - } + /* assume space for more */ + conn->ksnc_tx_ready = 1; + } - if (rc == -ENOMEM) { - /* Do nothing; after a short timeout, this - * conn will be reposted on kss_tx_conns. */ - } else if (conn->ksnc_tx_ready && + if (rc == -ENOMEM) { + /* Do nothing; after a short timeout, this + * conn will be reposted on kss_tx_conns. */ + } else if (conn->ksnc_tx_ready && !list_empty(&conn->ksnc_tx_queue)) { - /* reschedule for tx */ + /* reschedule for tx */ list_add_tail(&conn->ksnc_tx_list, - &sched->kss_tx_conns); - } else { - conn->ksnc_tx_scheduled = 0; - /* drop my ref */ - ksocknal_conn_decref(conn); - } + &sched->kss_tx_conns); + } else { + conn->ksnc_tx_scheduled = 0; + /* drop my ref */ + ksocknal_conn_decref(conn); + } - did_something = 1; - } - if (!did_something || /* nothing to do */ - ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */ + did_something = 1; + } + if (!did_something || /* nothing to do */ + ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */ spin_unlock_bh(&sched->kss_lock); - nloops = 0; + nloops = 0; - if (!did_something) { /* wait for something to do */ + if (!did_something) { /* wait for something to do */ rc = wait_event_interruptible_exclusive( sched->kss_waitq, !ksocknal_sched_cansleep(sched)); @@ -1610,6 +1635,10 @@ int ksocknal_scheduler(void *arg) } spin_unlock_bh(&sched->kss_lock); + LIBCFS_FREE(rx_scratch_pgs, sizeof(*rx_scratch_pgs) * + LNET_MAX_IOV); + LIBCFS_FREE(scratch_iov, sizeof(*scratch_iov) * + LNET_MAX_IOV); ksocknal_thread_fini(); return 0; } diff --git a/lnet/klnds/socklnd/socklnd_lib.c b/lnet/klnds/socklnd/socklnd_lib.c index 7277cdf..a5654df 100644 --- a/lnet/klnds/socklnd/socklnd_lib.c +++ b/lnet/klnds/socklnd/socklnd_lib.c @@ -71,7 +71,8 @@ ksocknal_lib_zc_capable(struct ksock_conn *conn) } int -ksocknal_lib_send_iov(struct ksock_conn *conn, struct ksock_tx *tx) +ksocknal_lib_send_iov(struct ksock_conn *conn, struct ksock_tx *tx, + struct kvec *scratchiov) { struct socket *sock = conn->ksnc_sock; int nob; @@ -92,7 +93,6 @@ ksocknal_lib_send_iov(struct ksock_conn *conn, struct ksock_tx *tx) struct kvec *scratchiov = &scratch; unsigned int niov = 1; #else - struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov; unsigned int niov = tx->tx_niov; #endif struct msghdr msg = { .msg_flags = MSG_DONTWAIT }; @@ -113,41 +113,42 @@ ksocknal_lib_send_iov(struct ksock_conn *conn, struct ksock_tx *tx) } int -ksocknal_lib_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx) +ksocknal_lib_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx, + struct kvec *scratchiov) { - struct socket *sock = conn->ksnc_sock; - lnet_kiov_t *kiov = tx->tx_kiov; - int rc; - int nob; + struct socket *sock = conn->ksnc_sock; + lnet_kiov_t *kiov = tx->tx_kiov; + int rc; + int nob; - /* Not NOOP message */ - LASSERT (tx->tx_lnetmsg != NULL); + /* Not NOOP message */ + LASSERT(tx->tx_lnetmsg != NULL); - /* NB we can't trust socket ops to either consume our iovs - * or leave them alone. */ - if (tx->tx_msg.ksm_zc_cookies[0] != 0) { - /* Zero copy is enabled */ - struct sock *sk = sock->sk; - struct page *page = kiov->kiov_page; - int offset = kiov->kiov_offset; - int fragsize = kiov->kiov_len; - int msgflg = MSG_DONTWAIT; + /* NB we can't trust socket ops to either consume our iovs + * or leave them alone. */ + if (tx->tx_msg.ksm_zc_cookies[0] != 0) { + /* Zero copy is enabled */ + struct sock *sk = sock->sk; + struct page *page = kiov->kiov_page; + int offset = kiov->kiov_offset; + int fragsize = kiov->kiov_len; + int msgflg = MSG_DONTWAIT; - CDEBUG(D_NET, "page %p + offset %x for %d\n", - page, offset, kiov->kiov_len); + CDEBUG(D_NET, "page %p + offset %x for %d\n", + page, offset, kiov->kiov_len); if (!list_empty(&conn->ksnc_tx_queue) || - fragsize < tx->tx_resid) - msgflg |= MSG_MORE; - - if (sk->sk_prot->sendpage != NULL) { - rc = sk->sk_prot->sendpage(sk, page, - offset, fragsize, msgflg); - } else { - rc = cfs_tcp_sendpage(sk, page, offset, fragsize, - msgflg); - } - } else { + fragsize < tx->tx_resid) + msgflg |= MSG_MORE; + + if (sk->sk_prot->sendpage != NULL) { + rc = sk->sk_prot->sendpage(sk, page, + offset, fragsize, msgflg); + } else { + rc = cfs_tcp_sendpage(sk, page, offset, fragsize, + msgflg); + } + } else { #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK struct kvec scratch; struct kvec *scratchiov = &scratch; @@ -156,7 +157,6 @@ ksocknal_lib_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx) #ifdef CONFIG_HIGHMEM #warning "XXX risk of kmap deadlock on multiple frags..." #endif - struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov; unsigned int niov = tx->tx_nkiov; #endif struct msghdr msg = { .msg_flags = MSG_DONTWAIT }; @@ -196,14 +196,13 @@ ksocknal_lib_eager_ack(struct ksock_conn *conn) } int -ksocknal_lib_recv_iov(struct ksock_conn *conn) +ksocknal_lib_recv_iov(struct ksock_conn *conn, struct kvec *scratchiov) { #if SOCKNAL_SINGLE_FRAG_RX struct kvec scratch; struct kvec *scratchiov = &scratch; unsigned int niov = 1; #else - struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov; unsigned int niov = conn->ksnc_rx_niov; #endif struct kvec *iov = conn->ksnc_rx_iov; @@ -301,7 +300,8 @@ ksocknal_lib_kiov_vmap(lnet_kiov_t *kiov, int niov, } int -ksocknal_lib_recv_kiov(struct ksock_conn *conn) +ksocknal_lib_recv_kiov(struct ksock_conn *conn, struct page **pages, + struct kvec *scratchiov) { #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK struct kvec scratch; @@ -312,8 +312,6 @@ ksocknal_lib_recv_kiov(struct ksock_conn *conn) #ifdef CONFIG_HIGHMEM #warning "XXX risk of kmap deadlock on multiple frags..." #endif - struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov; - struct page **pages = conn->ksnc_scheduler->kss_rx_scratch_pgs; unsigned int niov = conn->ksnc_rx_nkiov; #endif lnet_kiov_t *kiov = conn->ksnc_rx_kiov;