- ksock_sched_t *sched = (ksock_sched_t *)arg;
- ksock_conn_t *conn;
- ksock_tx_t *tx;
- int rc;
- int nloops = 0;
- int id = (int)(sched - ksocknal_data.ksnd_schedulers);
- char name[16];
-
- snprintf (name, sizeof (name),"socknal_sd%02d", id);
- cfs_daemonize (name);
- cfs_block_allsigs ();
-
- if (ksocknal_lib_bind_thread_to_cpu(id))
- CERROR ("Can't set CPU affinity for %s to %d\n", name, id);
-
- cfs_spin_lock_bh (&sched->kss_lock);
-
- while (!ksocknal_data.ksnd_shuttingdown) {
- int did_something = 0;
-
- /* Ensure I progress everything semi-fairly */
-
- if (!cfs_list_empty (&sched->kss_rx_conns)) {
- conn = cfs_list_entry(sched->kss_rx_conns.next,
- ksock_conn_t, ksnc_rx_list);
- cfs_list_del(&conn->ksnc_rx_list);
-
- LASSERT(conn->ksnc_rx_scheduled);
- LASSERT(conn->ksnc_rx_ready);
-
- /* clear rx_ready in case receive isn't complete.
- * Do it BEFORE we call process_recv, since
- * data_ready can set it any time after we release
- * kss_lock. */
- conn->ksnc_rx_ready = 0;
- cfs_spin_unlock_bh (&sched->kss_lock);
-
- rc = ksocknal_process_receive(conn);
-
- cfs_spin_lock_bh (&sched->kss_lock);
-
- /* I'm the only one that can clear this flag */
- LASSERT(conn->ksnc_rx_scheduled);
-
- /* Did process_receive get everything it wanted? */
- if (rc == 0)
- conn->ksnc_rx_ready = 1;
-
- if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) {
- /* Conn blocked waiting for ksocknal_recv()
- * I change its state (under lock) to signal
- * it can be rescheduled */
- conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT;
- } else if (conn->ksnc_rx_ready) {
- /* reschedule for rx */
- cfs_list_add_tail (&conn->ksnc_rx_list,
- &sched->kss_rx_conns);
- } else {
- conn->ksnc_rx_scheduled = 0;
- /* drop my ref */
- ksocknal_conn_decref(conn);
- }
-
- did_something = 1;
- }
-
- if (!cfs_list_empty (&sched->kss_tx_conns)) {
- CFS_LIST_HEAD (zlist);
-
- if (!cfs_list_empty(&sched->kss_zombie_noop_txs)) {
- cfs_list_add(&zlist,
- &sched->kss_zombie_noop_txs);
- cfs_list_del_init(&sched->kss_zombie_noop_txs);
- }
-
- conn = cfs_list_entry(sched->kss_tx_conns.next,
- ksock_conn_t, ksnc_tx_list);
- cfs_list_del (&conn->ksnc_tx_list);
-
- LASSERT(conn->ksnc_tx_scheduled);
- LASSERT(conn->ksnc_tx_ready);
- LASSERT(!cfs_list_empty(&conn->ksnc_tx_queue));
-
- tx = cfs_list_entry(conn->ksnc_tx_queue.next,
- ksock_tx_t, tx_list);
-
- if (conn->ksnc_tx_carrier == tx)
- ksocknal_next_tx_carrier(conn);
-
- /* dequeue now so empty list => more to send */
- cfs_list_del(&tx->tx_list);
-
- /* Clear tx_ready in case send isn't complete. Do
- * it BEFORE we call process_transmit, since
- * write_space can set it any time after we release
- * kss_lock. */
- conn->ksnc_tx_ready = 0;
- cfs_spin_unlock_bh (&sched->kss_lock);
-
- if (!cfs_list_empty(&zlist)) {
- /* free zombie noop txs, it's fast because
- * noop txs are just put in freelist */
- ksocknal_txlist_done(NULL, &zlist, 0);
- }
-
- rc = ksocknal_process_transmit(conn, tx);
-
- if (rc == -ENOMEM || rc == -EAGAIN) {
- /* Incomplete send: replace tx on HEAD of tx_queue */
- cfs_spin_lock_bh (&sched->kss_lock);
- cfs_list_add (&tx->tx_list,
- &conn->ksnc_tx_queue);
- } else {
- /* Complete send; tx -ref */
- ksocknal_tx_decref (tx);
-
- cfs_spin_lock_bh (&sched->kss_lock);
- /* assume space for more */
- conn->ksnc_tx_ready = 1;
- }
-
- if (rc == -ENOMEM) {
- /* Do nothing; after a short timeout, this
- * conn will be reposted on kss_tx_conns. */
- } else if (conn->ksnc_tx_ready &&
- !cfs_list_empty (&conn->ksnc_tx_queue)) {
- /* reschedule for tx */
- cfs_list_add_tail (&conn->ksnc_tx_list,
- &sched->kss_tx_conns);
- } else {
- conn->ksnc_tx_scheduled = 0;
- /* drop my ref */
- ksocknal_conn_decref(conn);
- }
-
- did_something = 1;
- }
- if (!did_something || /* nothing to do */
- ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
- cfs_spin_unlock_bh (&sched->kss_lock);
-
- nloops = 0;
-
- if (!did_something) { /* wait for something to do */
- cfs_wait_event_interruptible_exclusive(
- sched->kss_waitq,
- !ksocknal_sched_cansleep(sched), rc);
- LASSERT (rc == 0);
- } else {
- cfs_cond_resched();
- }
-
- cfs_spin_lock_bh (&sched->kss_lock);
- }
- }
-
- cfs_spin_unlock_bh (&sched->kss_lock);
- ksocknal_thread_fini ();
- return (0);
+ struct ksock_sched *sched;
+ struct ksock_conn *conn;
+ struct ksock_tx *tx;
+ int rc;
+ int nloops = 0;
+ long id = (long)arg;
+ struct page **rx_scratch_pgs;
+ struct kvec *scratch_iov;
+
+ sched = ksocknal_data.ksnd_schedulers[KSOCK_THREAD_CPT(id)];
+
+ LIBCFS_CPT_ALLOC(rx_scratch_pgs, lnet_cpt_table(), sched->kss_cpt,
+ sizeof(*rx_scratch_pgs) * LNET_MAX_IOV);
+ if (!rx_scratch_pgs) {
+ CERROR("Unable to allocate scratch pages\n");
+ return -ENOMEM;
+ }
+
+ LIBCFS_CPT_ALLOC(scratch_iov, lnet_cpt_table(), sched->kss_cpt,
+ sizeof(*scratch_iov) * LNET_MAX_IOV);
+ if (!scratch_iov) {
+ CERROR("Unable to allocate scratch iov\n");
+ return -ENOMEM;
+ }
+
+ rc = cfs_cpt_bind(lnet_cpt_table(), sched->kss_cpt);
+ if (rc != 0) {
+ CWARN("Can't set CPU partition affinity to %d: %d\n",
+ sched->kss_cpt, rc);
+ }
+
+ spin_lock_bh(&sched->kss_lock);
+
+ while (!ksocknal_data.ksnd_shuttingdown) {
+ int did_something = 0;
+
+ /* Ensure I progress everything semi-fairly */
+
+ if (!list_empty(&sched->kss_rx_conns)) {
+ conn = list_entry(sched->kss_rx_conns.next,
+ struct ksock_conn, ksnc_rx_list);
+ list_del(&conn->ksnc_rx_list);
+
+ LASSERT(conn->ksnc_rx_scheduled);
+ LASSERT(conn->ksnc_rx_ready);
+
+ /* clear rx_ready in case receive isn't complete.
+ * Do it BEFORE we call process_recv, since
+ * data_ready can set it any time after we release
+ * kss_lock. */
+ conn->ksnc_rx_ready = 0;
+ spin_unlock_bh(&sched->kss_lock);
+
+ rc = ksocknal_process_receive(conn, rx_scratch_pgs,
+ scratch_iov);
+
+ spin_lock_bh(&sched->kss_lock);
+
+ /* I'm the only one that can clear this flag */
+ LASSERT(conn->ksnc_rx_scheduled);
+
+ /* Did process_receive get everything it wanted? */
+ if (rc == 0)
+ conn->ksnc_rx_ready = 1;
+
+ if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) {
+ /* Conn blocked waiting for ksocknal_recv()
+ * I change its state (under lock) to signal
+ * it can be rescheduled */
+ conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT;
+ } else if (conn->ksnc_rx_ready) {
+ /* reschedule for rx */
+ list_add_tail(&conn->ksnc_rx_list,
+ &sched->kss_rx_conns);
+ } else {
+ conn->ksnc_rx_scheduled = 0;
+ /* drop my ref */
+ ksocknal_conn_decref(conn);
+ }
+
+ did_something = 1;
+ }
+
+ if (!list_empty(&sched->kss_tx_conns)) {
+ LIST_HEAD(zlist);
+
+ list_splice_init(&sched->kss_zombie_noop_txs, &zlist);
+
+ conn = list_entry(sched->kss_tx_conns.next,
+ struct ksock_conn, ksnc_tx_list);
+ list_del(&conn->ksnc_tx_list);
+
+ LASSERT(conn->ksnc_tx_scheduled);
+ LASSERT(conn->ksnc_tx_ready);
+ LASSERT(!list_empty(&conn->ksnc_tx_queue));
+
+ tx = list_entry(conn->ksnc_tx_queue.next,
+ struct ksock_tx, tx_list);
+
+ if (conn->ksnc_tx_carrier == tx)
+ ksocknal_next_tx_carrier(conn);
+
+ /* dequeue now so empty list => more to send */
+ list_del(&tx->tx_list);
+
+ /* Clear tx_ready in case send isn't complete. Do
+ * it BEFORE we call process_transmit, since
+ * write_space can set it any time after we release
+ * kss_lock. */
+ conn->ksnc_tx_ready = 0;
+ spin_unlock_bh(&sched->kss_lock);
+
+ if (!list_empty(&zlist)) {
+ /* free zombie noop txs, it's fast because
+ * noop txs are just put in freelist */
+ ksocknal_txlist_done(NULL, &zlist, 0);
+ }
+
+ rc = ksocknal_process_transmit(conn, tx, scratch_iov);
+
+ if (rc == -ENOMEM || rc == -EAGAIN) {
+ /* Incomplete send: replace tx on HEAD of tx_queue */
+ spin_lock_bh(&sched->kss_lock);
+ list_add(&tx->tx_list,
+ &conn->ksnc_tx_queue);
+ } else {
+ /* Complete send; tx -ref */
+ ksocknal_tx_decref(tx);
+
+ spin_lock_bh(&sched->kss_lock);
+ /* assume space for more */
+ conn->ksnc_tx_ready = 1;
+ }
+
+ if (rc == -ENOMEM) {
+ /* Do nothing; after a short timeout, this
+ * conn will be reposted on kss_tx_conns. */
+ } else if (conn->ksnc_tx_ready &&
+ !list_empty(&conn->ksnc_tx_queue)) {
+ /* reschedule for tx */
+ list_add_tail(&conn->ksnc_tx_list,
+ &sched->kss_tx_conns);
+ } else {
+ conn->ksnc_tx_scheduled = 0;
+ /* drop my ref */
+ ksocknal_conn_decref(conn);
+ }
+
+ did_something = 1;
+ }
+ if (!did_something || /* nothing to do */
+ ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
+ spin_unlock_bh(&sched->kss_lock);
+
+ nloops = 0;
+
+ if (!did_something) { /* wait for something to do */
+ rc = wait_event_interruptible_exclusive(
+ sched->kss_waitq,
+ !ksocknal_sched_cansleep(sched));
+ LASSERT (rc == 0);
+ } else {
+ cond_resched();
+ }
+
+ spin_lock_bh(&sched->kss_lock);
+ }
+ }
+
+ spin_unlock_bh(&sched->kss_lock);
+ CFS_FREE_PTR_ARRAY(rx_scratch_pgs, LNET_MAX_IOV);
+ CFS_FREE_PTR_ARRAY(scratch_iov, LNET_MAX_IOV);
+ ksocknal_thread_fini();
+ return 0;