X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fralnd%2Fralnd_cb.c;h=fc5ed3f3f853a823f94967f9edc31edbc09c895f;hp=ff080f461b847d297e54a2f42a52bc60b35b2286;hb=0284022e44588fb4c0525351b0d20b4453683eb3;hpb=4ab1d51e7bbd98006a21a1655f7e5bffec3cf0d4 diff --git a/lnet/klnds/ralnd/ralnd_cb.c b/lnet/klnds/ralnd/ralnd_cb.c index ff080f4..fc5ed3f 100644 --- a/lnet/klnds/ralnd/ralnd_cb.c +++ b/lnet/klnds/ralnd/ralnd_cb.c @@ -43,7 +43,9 @@ kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg) kra_device_t *dev; int i; unsigned long flags; - + + CDEBUG(D_NET, "callback for device %d\n", devid); + for (i = 0; i < kranal_data.kra_ndevs; i++) { dev = &kranal_data.kra_devices[i]; @@ -60,7 +62,7 @@ kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg) spin_unlock_irqrestore(&dev->rad_lock, flags); return; } - + CWARN("callback for unknown device %d\n", devid); } @@ -69,9 +71,9 @@ kranal_schedule_conn(kra_conn_t *conn) { kra_device_t *dev = conn->rac_device; unsigned long flags; - + spin_lock_irqsave(&dev->rad_lock, flags); - + if (!conn->rac_scheduled) { kranal_conn_addref(conn); /* +1 ref for scheduler */ conn->rac_scheduled = 1; @@ -83,11 +85,11 @@ kranal_schedule_conn(kra_conn_t *conn) } kra_tx_t * -kranal_get_idle_tx (int may_block) +kranal_get_idle_tx (int may_block) { unsigned long flags; kra_tx_t *tx = NULL; - + for (;;) { spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); @@ -132,7 +134,7 @@ kranal_get_idle_tx (int may_block) } spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); - + return tx; } @@ -159,40 +161,46 @@ kranal_new_tx_msg (int may_block, int type) } int -kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, +kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, int offset, int nob) - + { /* For now this is almost identical to kranal_setup_virt_buffer, but we * could "flatten" the payload into a single contiguous buffer ready * for sending direct over an FMA if we ever needed to. */ - LASSERT (nob > 0); - LASSERT (niov > 0); LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + LASSERT (nob >= 0); - while (offset >= iov->iov_len) { - offset -= iov->iov_len; - niov--; - iov++; + if (nob == 0) { + tx->tx_buffer = NULL; + } else { LASSERT (niov > 0); - } - if (nob > iov->iov_len - offset) { - CERROR("Can't handle multiple vaddr fragments\n"); - return -EMSGSIZE; + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + niov--; + iov++; + LASSERT (niov > 0); + } + + if (nob > iov->iov_len - offset) { + CERROR("Can't handle multiple vaddr fragments\n"); + return -EMSGSIZE; + } + + tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); } tx->tx_buftype = RANAL_BUF_IMMEDIATE; tx->tx_nob = nob; - tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); return 0; } int -kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, +kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, int offset, int nob) - + { LASSERT (nob > 0); LASSERT (niov > 0); @@ -239,7 +247,7 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED; tx->tx_nob = nob; tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset)); - + phys->Address = kranal_page2phys(kiov->kiov_page); phys++; @@ -250,13 +258,13 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, LASSERT (nkiov > 0); if (kiov->kiov_offset != 0 || - ((resid > PAGE_SIZE) && + ((resid > PAGE_SIZE) && kiov->kiov_len < PAGE_SIZE)) { /* Can't have gaps */ CERROR("Can't make payload contiguous in I/O VM:" - "page %d, offset %d, len %d \n", - (int)(phys - tx->tx_phys), - kiov->kiov_offset, kiov->kiov_len); + "page %d, offset %d, len %d \n", + (int)(phys - tx->tx_phys), + kiov->kiov_offset, kiov->kiov_len); return -EINVAL; } @@ -276,15 +284,15 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, } static inline int -kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, +kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, struct iovec *iov, ptl_kiov_t *kiov, int offset, int nob) { LASSERT ((iov == NULL) != (kiov == NULL)); - + if (kiov != NULL) return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob); - + return kranal_setup_virt_buffer(tx, niov, iov, offset, nob); } @@ -300,13 +308,13 @@ kranal_map_buffer (kra_tx_t *tx) switch (tx->tx_buftype) { default: LBUG(); - + case RANAL_BUF_NONE: case RANAL_BUF_IMMEDIATE: case RANAL_BUF_PHYS_MAPPED: case RANAL_BUF_VIRT_MAPPED: break; - + case RANAL_BUF_PHYS_UNMAPPED: rrc = RapkRegisterPhys(dev->rad_handle, tx->tx_phys, tx->tx_phys_npages, @@ -334,13 +342,13 @@ kranal_unmap_buffer (kra_tx_t *tx) switch (tx->tx_buftype) { default: LBUG(); - + case RANAL_BUF_NONE: case RANAL_BUF_IMMEDIATE: case RANAL_BUF_PHYS_UNMAPPED: case RANAL_BUF_VIRT_UNMAPPED: break; - + case RANAL_BUF_PHYS_MAPPED: LASSERT (tx->tx_conn != NULL); dev = tx->tx_conn->rac_device; @@ -438,11 +446,11 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) /* If I get here, I've committed to send, so I complete the tx with * failure on any problems */ - + LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */ read_lock(g_lock); - + peer = kranal_find_peer_locked(nid); if (peer == NULL) { read_unlock(g_lock); @@ -456,7 +464,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) read_unlock(g_lock); return; } - + /* Making one or more connections; I'll need a write lock... */ read_unlock(g_lock); write_lock_irqsave(g_lock, flags); @@ -480,26 +488,26 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) if (!peer->rap_connecting) { LASSERT (list_empty(&peer->rap_tx_queue)); - + now = CURRENT_SECONDS; if (now < peer->rap_reconnect_time) { write_unlock_irqrestore(g_lock, flags); kranal_tx_done(tx, -EHOSTUNREACH); return; } - + peer->rap_connecting = 1; kranal_peer_addref(peer); /* extra ref for connd */ - + spin_lock(&kranal_data.kra_connd_lock); - + list_add_tail(&peer->rap_connd_list, &kranal_data.kra_connd_peers); wake_up(&kranal_data.kra_connd_waitq); - + spin_unlock(&kranal_data.kra_connd_lock); } - + /* A connection is being established; queue the message... */ list_add_tail(&tx->tx_list, &peer->rap_tx_queue); @@ -507,7 +515,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) } void -kranal_rdma(kra_tx_t *tx, int type, +kranal_rdma(kra_tx_t *tx, int type, kra_rdma_desc_t *sink, int nob, __u64 cookie) { kra_conn_t *conn = tx->tx_conn; @@ -520,7 +528,7 @@ kranal_rdma(kra_tx_t *tx, int type, /* No actual race with scheduler sending CLOSE (I'm she!) */ LASSERT (current == conn->rac_device->rad_scheduler); - + memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc)); tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_rdma_desc.SrcKey = tx->tx_map_key; @@ -556,6 +564,7 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob) RAP_RETURN rrc; LASSERT (conn->rac_rxmsg != NULL); + CDEBUG(D_NET, "Consuming %p\n", conn); rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer, &nob_received, sizeof(kra_msg_t)); @@ -563,25 +572,26 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob) conn->rac_rxmsg = NULL; - if (nob_received != nob) { - CWARN("Expected %d immediate bytes but got %d\n", - nob, nob_received); + if (nob_received < nob) { + CWARN("Incomplete immediate msg from "LPX64 + ": expected %d, got %d\n", + conn->rac_peer->rap_nid, nob, nob_received); return -EPROTO; } - + return 0; } ptl_err_t -kranal_do_send (lib_nal_t *nal, +kranal_do_send (lib_nal_t *nal, void *private, lib_msg_t *libmsg, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, + ptl_hdr_t *hdr, + int type, + ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, - struct iovec *iov, + unsigned int niov, + struct iovec *iov, ptl_kiov_t *kiov, int offset, int nob) @@ -605,7 +615,7 @@ kranal_do_send (lib_nal_t *nal, switch(type) { default: LBUG(); - + case PTL_MSG_REPLY: { /* reply's 'private' is the conn that received the GET_REQ */ conn = private; @@ -619,7 +629,7 @@ kranal_do_send (lib_nal_t *nal, } break; /* RDMA not expected */ } - + /* Incoming message consistent with immediate reply? */ if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) { CERROR("REPLY to "LPX64" bad msg type %x!!!\n", @@ -644,6 +654,9 @@ kranal_do_send (lib_nal_t *nal, kranal_rdma(tx, RANAL_MSG_GET_DONE, &conn->rac_rxmsg->ram_u.get.ragm_desc, nob, conn->rac_rxmsg->ram_u.get.ragm_cookie); + + /* flag matched by consuming rx message */ + kranal_consume_rxmsg(conn, NULL, 0); return PTL_OK; } @@ -697,10 +710,10 @@ kranal_do_send (lib_nal_t *nal, case PTL_MSG_PUT: if (kiov == NULL && /* not paged */ - nob <= RANAL_MAX_IMMEDIATE && /* small enough */ + nob <= RANAL_FMA_MAX_DATA && /* small enough */ nob <= kranal_tunables.kra_max_immediate) break; /* send IMMEDIATE */ - + tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ); if (tx == NULL) return PTL_NO_SPACE; @@ -719,11 +732,11 @@ kranal_do_send (lib_nal_t *nal, } LASSERT (kiov == NULL); - LASSERT (nob <= RANAL_MAX_IMMEDIATE); + LASSERT (nob <= RANAL_FMA_MAX_DATA); tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK || type == PTL_MSG_REPLY || - in_interrupt()), + in_interrupt()), RANAL_MSG_IMMEDIATE); if (tx == NULL) return PTL_NO_SPACE; @@ -733,7 +746,7 @@ kranal_do_send (lib_nal_t *nal, kranal_tx_done(tx, rc); return PTL_FAIL; } - + tx->tx_msg.ram_u.immediate.raim_hdr = *hdr; tx->tx_libmsg[0] = libmsg; kranal_launch_tx(tx, nid); @@ -753,9 +766,9 @@ kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie, } ptl_err_t -kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, +kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, ptl_kiov_t *kiov, + unsigned int niov, ptl_kiov_t *kiov, size_t offset, size_t len) { return kranal_do_send(nal, private, cookie, @@ -774,17 +787,26 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg, kra_tx_t *tx; void *buffer; int rc; - + LASSERT (mlen <= rlen); LASSERT (!in_interrupt()); /* Either all pages or all vaddrs */ LASSERT (!(kiov != NULL && iov != NULL)); + CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg); + + if (libmsg == NULL) { + /* GET or ACK or portals is discarding */ + LASSERT (mlen == 0); + lib_finalize(nal, NULL, libmsg, PTL_OK); + return PTL_OK; + } + switch(rxmsg->ram_type) { default: LBUG(); return PTL_FAIL; - + case RANAL_MSG_IMMEDIATE: if (mlen == 0) { buffer = NULL; @@ -809,21 +831,7 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg, lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL); return PTL_OK; - case RANAL_MSG_GET_REQ: - /* If the GET matched, we've already handled it in - * kranal_do_send which is called to send the REPLY. We're - * only called here to complete the GET receive (if we needed - * it which we don't, but I digress...) */ - LASSERT (libmsg == NULL); - lib_finalize(nal, NULL, libmsg, PTL_OK); - return PTL_OK; - case RANAL_MSG_PUT_REQ: - if (libmsg == NULL) { /* PUT didn't match... */ - lib_finalize(nal, NULL, libmsg, PTL_OK); - return PTL_OK; - } - tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK); if (tx == NULL) return PTL_NO_SPACE; @@ -836,19 +844,19 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg, tx->tx_conn = conn; kranal_map_buffer(tx); - - tx->tx_msg.ram_u.putack.rapam_src_cookie = + + tx->tx_msg.ram_u.putack.rapam_src_cookie = conn->rac_rxmsg->ram_u.putreq.raprm_cookie; tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie; tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key; - tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = + tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen; tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */ kranal_post_fma(conn, tx); - + /* flag matched by consuming rx message */ kranal_consume_rxmsg(conn, NULL, 0); return PTL_OK; @@ -857,7 +865,7 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg, ptl_err_t kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, struct iovec *iov, + unsigned int niov, struct iovec *iov, size_t offset, size_t mlen, size_t rlen) { return kranal_do_recv(nal, private, msg, niov, iov, NULL, @@ -866,7 +874,7 @@ kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, ptl_err_t kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, ptl_kiov_t *kiov, + unsigned int niov, ptl_kiov_t *kiov, size_t offset, size_t mlen, size_t rlen) { return kranal_do_recv(nal, private, msg, niov, NULL, kiov, @@ -906,6 +914,8 @@ kranal_check_conn_timeouts (kra_conn_t *conn) if (!conn->rac_close_sent && time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) { /* not sent in a while; schedule conn so scheduler sends a keepalive */ + CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n", + conn, conn->rac_peer->rap_nid); kranal_schedule_conn(conn); } @@ -913,14 +923,16 @@ kranal_check_conn_timeouts (kra_conn_t *conn) if (!conn->rac_close_recvd && time_after_eq(now, conn->rac_last_rx + timeout)) { - CERROR("Nothing received from "LPX64" within %lu seconds\n", + CERROR("%s received from "LPX64" within %lu seconds\n", + (conn->rac_state == RANAL_CONN_ESTABLISHED) ? + "Nothing" : "CLOSE not", conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ); return -ETIMEDOUT; } if (conn->rac_state != RANAL_CONN_ESTABLISHED) return 0; - + /* Check the conn's queues are moving. These are "belt+braces" checks, * in case of hardware/software errors that make this conn seem * responsive even though it isn't progressing its message queues. */ @@ -929,7 +941,7 @@ kranal_check_conn_timeouts (kra_conn_t *conn) list_for_each (ttmp, &conn->rac_fmaq) { tx = list_entry(ttmp, kra_tx_t, tx_list); - + if (time_after_eq(now, tx->tx_qtime + timeout)) { spin_unlock_irqrestore(&conn->rac_lock, flags); CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n", @@ -937,10 +949,10 @@ kranal_check_conn_timeouts (kra_conn_t *conn) return -ETIMEDOUT; } } - + list_for_each (ttmp, &conn->rac_rdmaq) { tx = list_entry(ttmp, kra_tx_t, tx_list); - + if (time_after_eq(now, tx->tx_qtime + timeout)) { spin_unlock_irqrestore(&conn->rac_lock, flags); CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n", @@ -948,10 +960,10 @@ kranal_check_conn_timeouts (kra_conn_t *conn) return -ETIMEDOUT; } } - + list_for_each (ttmp, &conn->rac_replyq) { tx = list_entry(ttmp, kra_tx_t, tx_list); - + if (time_after_eq(now, tx->tx_qtime + timeout)) { spin_unlock_irqrestore(&conn->rac_lock, flags); CERROR("tx on replyq for "LPX64" blocked %lu seconds\n", @@ -959,7 +971,7 @@ kranal_check_conn_timeouts (kra_conn_t *conn) return -ETIMEDOUT; } } - + spin_unlock_irqrestore(&conn->rac_lock, flags); return 0; } @@ -1005,12 +1017,12 @@ kranal_reaper_check (int idx, unsigned long *min_timeoutp) case RANAL_CONN_ESTABLISHED: kranal_close_conn_locked(conn, -ETIMEDOUT); break; - + case RANAL_CONN_CLOSING: kranal_terminate_conn_locked(conn); break; } - + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); kranal_conn_decref(conn); @@ -1025,6 +1037,7 @@ kranal_reaper_check (int idx, unsigned long *min_timeoutp) int kranal_connd (void *arg) { + long id = (long)arg; char name[16]; wait_queue_t wait; unsigned long flags; @@ -1032,7 +1045,7 @@ kranal_connd (void *arg) kra_acceptsock_t *ras; int did_something; - snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg); + snprintf(name, sizeof(name), "kranal_connd_%02ld", id); kportal_daemonize(name); kportal_blockallsigs(); @@ -1042,24 +1055,29 @@ kranal_connd (void *arg) while (!kranal_data.kra_shutdown) { did_something = 0; - + if (!list_empty(&kranal_data.kra_connd_acceptq)) { ras = list_entry(kranal_data.kra_connd_acceptq.next, kra_acceptsock_t, ras_list); list_del(&ras->ras_list); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + CDEBUG(D_NET,"About to handshake someone\n"); + kranal_conn_handshake(ras->ras_sock, NULL); kranal_free_acceptsock(ras); + CDEBUG(D_NET,"Finished handshaking someone\n"); + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); did_something = 1; } - + if (!list_empty(&kranal_data.kra_connd_peers)) { peer = list_entry(kranal_data.kra_connd_peers.next, kra_peer_t, rap_connd_list); - + list_del_init(&peer->rap_connd_list); spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); @@ -1075,11 +1093,11 @@ kranal_connd (void *arg) set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&kranal_data.kra_connd_waitq, &wait); - + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); schedule (); - + set_current_state(TASK_RUNNING); remove_wait_queue(&kranal_data.kra_connd_waitq, &wait); @@ -1093,14 +1111,14 @@ kranal_connd (void *arg) } void -kranal_update_reaper_timeout(long timeout) +kranal_update_reaper_timeout(long timeout) { unsigned long flags; LASSERT (timeout > 0); - + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - + if (timeout < kranal_data.kra_new_min_timeout) kranal_data.kra_new_min_timeout = timeout; @@ -1120,7 +1138,7 @@ kranal_reaper (void *arg) unsigned long next_check_time = jiffies; long next_min_timeout = MAX_SCHEDULE_TIMEOUT; long current_min_timeout = 1; - + kportal_daemonize("kranal_reaper"); kportal_blockallsigs(); @@ -1129,95 +1147,91 @@ kranal_reaper (void *arg) spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); while (!kranal_data.kra_shutdown) { + /* I wake up every 'p' seconds to check for timeouts on some + * more peers. I try to check every connection 'n' times + * within the global minimum of all keepalive and timeout + * intervals, to ensure I attend to every connection within + * (n+1)/n times its timeout intervals. */ + const int p = 1; + const int n = 3; + unsigned long min_timeout; + int chunk; /* careful with the jiffy wrap... */ timeout = (long)(next_check_time - jiffies); - if (timeout <= 0) { - - /* I wake up every 'p' seconds to check for - * timeouts on some more peers. I try to check - * every connection 'n' times within the global - * minimum of all keepalive and timeout intervals, - * to ensure I attend to every connection within - * (n+1)/n times its timeout intervals. */ - - const int p = 1; - const int n = 3; - unsigned long min_timeout; - int chunk; - - if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) { - /* new min timeout set: restart min timeout scan */ - next_min_timeout = MAX_SCHEDULE_TIMEOUT; - base_index = conn_index - 1; - if (base_index < 0) - base_index = conn_entries - 1; - - if (kranal_data.kra_new_min_timeout < current_min_timeout) { - current_min_timeout = kranal_data.kra_new_min_timeout; - CWARN("Set new min timeout %ld\n", - current_min_timeout); - } - - kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; - } - min_timeout = current_min_timeout; - - spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, - flags); - - LASSERT (min_timeout > 0); - - /* Compute how many table entries to check now so I - * get round the whole table fast enough (NB I do - * this at fixed intervals of 'p' seconds) */ - chunk = conn_entries; - if (min_timeout > n * p) - chunk = (chunk * n * p) / min_timeout; - if (chunk == 0) - chunk = 1; - - for (i = 0; i < chunk; i++) { - kranal_reaper_check(conn_index, - &next_min_timeout); - conn_index = (conn_index + 1) % conn_entries; - } + if (timeout > 0) { + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&kranal_data.kra_reaper_waitq, &wait); - next_check_time += p * HZ; + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); + + schedule_timeout(timeout); spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - if (((conn_index - chunk <= base_index && - base_index < conn_index) || - (conn_index - conn_entries - chunk <= base_index && - base_index < conn_index - conn_entries))) { - - /* Scanned all conns: set current_min_timeout... */ - if (current_min_timeout != next_min_timeout) { - current_min_timeout = next_min_timeout; - CWARN("Set new min timeout %ld\n", - current_min_timeout); - } - - /* ...and restart min timeout scan */ - next_min_timeout = MAX_SCHEDULE_TIMEOUT; - base_index = conn_index - 1; - if (base_index < 0) - base_index = conn_entries - 1; - } + set_current_state(TASK_RUNNING); + remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + continue; } - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) { + /* new min timeout set: restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + + if (kranal_data.kra_new_min_timeout < current_min_timeout) { + current_min_timeout = kranal_data.kra_new_min_timeout; + CDEBUG(D_NET, "Set new min timeout %ld\n", + current_min_timeout); + } + + kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; + } + min_timeout = current_min_timeout; spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); - schedule_timeout(timeout); + LASSERT (min_timeout > 0); + + /* Compute how many table entries to check now so I get round + * the whole table fast enough given that I do this at fixed + * intervals of 'p' seconds) */ + chunk = conn_entries; + if (min_timeout > n * p) + chunk = (chunk * n * p) / min_timeout; + if (chunk == 0) + chunk = 1; + + for (i = 0; i < chunk; i++) { + kranal_reaper_check(conn_index, + &next_min_timeout); + conn_index = (conn_index + 1) % conn_entries; + } + + next_check_time += p * HZ; spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - set_current_state(TASK_RUNNING); - remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + if (((conn_index - chunk <= base_index && + base_index < conn_index) || + (conn_index - conn_entries - chunk <= base_index && + base_index < conn_index - conn_entries))) { + + /* Scanned all conns: set current_min_timeout... */ + if (current_min_timeout != next_min_timeout) { + current_min_timeout = next_min_timeout; + CDEBUG(D_NET, "Set new min timeout %ld\n", + current_min_timeout); + } + + /* ...and restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + } } kranal_thread_fini(); @@ -1237,8 +1251,10 @@ kranal_check_rdma_cq (kra_device_t *dev) for (;;) { rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type); - if (rrc == RAP_NOT_DONE) + if (rrc == RAP_NOT_DONE) { + CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id); return; + } LASSERT (rrc == RAP_SUCCESS); LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0); @@ -1248,7 +1264,7 @@ kranal_check_rdma_cq (kra_device_t *dev) conn = kranal_cqid2conn_locked(cqid); if (conn == NULL) { /* Conn was destroyed? */ - CWARN("RDMA CQID lookup %d failed\n", cqid); + CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid); read_unlock(&kranal_data.kra_global_lock); continue; } @@ -1256,6 +1272,9 @@ kranal_check_rdma_cq (kra_device_t *dev) rrc = RapkRdmaDone(conn->rac_rihandle, &desc); LASSERT (rrc == RAP_SUCCESS); + CDEBUG(D_NET, "Completed %p\n", + list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list)); + spin_lock_irqsave(&conn->rac_lock, flags); LASSERT (!list_empty(&conn->rac_rdmaq)); @@ -1268,7 +1287,7 @@ kranal_check_rdma_cq (kra_device_t *dev) list_add_tail(&tx->tx_list, &conn->rac_fmaq); tx->tx_qtime = jiffies; - + spin_unlock_irqrestore(&conn->rac_lock, flags); /* Get conn's fmaq processed, now I've just put something @@ -1292,20 +1311,26 @@ kranal_check_fma_cq (kra_device_t *dev) for (;;) { rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type); - if (rrc != RAP_NOT_DONE) + if (rrc == RAP_NOT_DONE) { + CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id); return; - + } + LASSERT (rrc == RAP_SUCCESS); if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) { read_lock(&kranal_data.kra_global_lock); - + conn = kranal_cqid2conn_locked(cqid); - if (conn == NULL) - CWARN("FMA CQID lookup %d failed\n", cqid); - else + if (conn == NULL) { + CDEBUG(D_NET, "FMA CQID lookup %d failed\n", + cqid); + } else { + CDEBUG(D_NET, "FMA completed: %p CQID %d\n", + conn, cqid); kranal_schedule_conn(conn); + } read_unlock(&kranal_data.kra_global_lock); continue; @@ -1315,15 +1340,15 @@ kranal_check_fma_cq (kra_device_t *dev) CWARN("Scheduling ALL conns on device %d\n", dev->rad_id); for (i = 0; i < kranal_data.kra_conn_hash_size; i++) { - + read_lock(&kranal_data.kra_global_lock); - + conns = &kranal_data.kra_conns[i]; list_for_each (tmp, conns) { - conn = list_entry(tmp, kra_conn_t, + conn = list_entry(tmp, kra_conn_t, rac_hashlist); - + if (conn->rac_device == dev) kranal_schedule_conn(conn); } @@ -1340,7 +1365,11 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, { int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0; RAP_RETURN rrc; - + + CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n", + conn, msg, msg->ram_type, sync ? "(sync)" : "", + immediate, immediatenob); + LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX); LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ? immediatenob <= RANAL_FMA_MAX_DATA : @@ -1350,11 +1379,11 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, msg->ram_seq = conn->rac_tx_seq; if (sync) - rrc = RapkFmaSyncSend(conn->rac_device->rad_handle, + rrc = RapkFmaSyncSend(conn->rac_rihandle, immediate, immediatenob, msg, sizeof(*msg)); else - rrc = RapkFmaSend(conn->rac_device->rad_handle, + rrc = RapkFmaSend(conn->rac_rihandle, immediate, immediatenob, msg, sizeof(*msg)); @@ -1366,14 +1395,18 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, conn->rac_last_tx = jiffies; conn->rac_tx_seq++; return 0; - + case RAP_NOT_DONE: + if (time_after_eq(jiffies, + conn->rac_last_tx + conn->rac_keepalive*HZ)) + CDEBUG(D_WARNING, "EAGAIN sending %02x (idle %lu secs)\n", + msg->ram_type, (jiffies - conn->rac_last_tx)/HZ); return -EAGAIN; } } void -kranal_process_fmaq (kra_conn_t *conn) +kranal_process_fmaq (kra_conn_t *conn) { unsigned long flags; int more_to_do; @@ -1384,28 +1417,33 @@ kranal_process_fmaq (kra_conn_t *conn) /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now. * However I will be rescheduled some by an FMA completion event * when I eventually get some. - * NB 2. Sampling rac_state here, races with setting it elsewhere - * kranal_close_conn_locked. But it doesn't matter if I try to - * send a "real" message just as I start closing because I'll get - * scheduled to send the close anyway. */ + * NB 2. Sampling rac_state here races with setting it elsewhere. + * But it doesn't matter if I try to send a "real" message just + * as I start closing because I'll get scheduled to send the + * close anyway. */ + + /* Not racing with incoming message processing! */ + LASSERT (current == conn->rac_device->rad_scheduler); if (conn->rac_state != RANAL_CONN_ESTABLISHED) { if (!list_empty(&conn->rac_rdmaq)) { /* RDMAs in progress */ LASSERT (!conn->rac_close_sent); - - if (time_after_eq(jiffies, - conn->rac_last_tx + - conn->rac_keepalive)) { + + if (time_after_eq(jiffies, + conn->rac_last_tx + + conn->rac_keepalive * HZ)) { + CDEBUG(D_NET, "sending NOOP (rdma in progress)\n"); kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); } return; } - + if (conn->rac_close_sent) return; + CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid); kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE); rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); if (rc != 0) @@ -1414,7 +1452,7 @@ kranal_process_fmaq (kra_conn_t *conn) conn->rac_close_sent = 1; if (!conn->rac_close_recvd) return; - + write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (conn->rac_state == RANAL_CONN_CLOSING) @@ -1430,14 +1468,17 @@ kranal_process_fmaq (kra_conn_t *conn) spin_unlock_irqrestore(&conn->rac_lock, flags); - if (time_after_eq(jiffies, - conn->rac_last_tx + conn->rac_keepalive)) { + if (time_after_eq(jiffies, + conn->rac_last_tx + conn->rac_keepalive * HZ)) { + CDEBUG(D_NET, "sending NOOP -> "LPX64" (%p idle %lu(%ld))\n", + conn->rac_peer->rap_nid, conn, + (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive); kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); } return; } - + tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); list_del(&tx->tx_list); more_to_do = !list_empty(&conn->rac_fmaq); @@ -1445,20 +1486,26 @@ kranal_process_fmaq (kra_conn_t *conn) spin_unlock_irqrestore(&conn->rac_lock, flags); expect_reply = 0; + CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n", + tx, tx->tx_msg.ram_type, tx->tx_cookie); switch (tx->tx_msg.ram_type) { default: LBUG(); - + case RANAL_MSG_IMMEDIATE: + rc = kranal_sendmsg(conn, &tx->tx_msg, + tx->tx_buffer, tx->tx_nob); + expect_reply = 0; + break; + case RANAL_MSG_PUT_NAK: case RANAL_MSG_PUT_DONE: case RANAL_MSG_GET_NAK: case RANAL_MSG_GET_DONE: - rc = kranal_sendmsg(conn, &tx->tx_msg, - tx->tx_buffer, tx->tx_nob); + rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); expect_reply = 0; break; - + case RANAL_MSG_PUT_REQ: tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie; rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); @@ -1475,7 +1522,7 @@ kranal_process_fmaq (kra_conn_t *conn) kranal_map_buffer(tx); tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie; tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key; - tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = + tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob; rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); @@ -1486,6 +1533,7 @@ kranal_process_fmaq (kra_conn_t *conn) if (rc == -EAGAIN) { /* I need credits to send this. Replace tx at the head of the * fmaq and I'll get rescheduled when credits appear */ + CDEBUG(D_NET, "EAGAIN on %p\n", conn); spin_lock_irqsave(&conn->rac_lock, flags); list_add(&tx->tx_list, &conn->rac_fmaq); spin_unlock_irqrestore(&conn->rac_lock, flags); @@ -1493,18 +1541,22 @@ kranal_process_fmaq (kra_conn_t *conn) } LASSERT (rc == 0); - + if (!expect_reply) { kranal_tx_done(tx, 0); } else { + /* LASSERT(current) above ensures this doesn't race with reply + * processing */ spin_lock_irqsave(&conn->rac_lock, flags); list_add_tail(&tx->tx_list, &conn->rac_replyq); tx->tx_qtime = jiffies; spin_unlock_irqrestore(&conn->rac_lock, flags); } - if (more_to_do) + if (more_to_do) { + CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn); kranal_schedule_conn(conn); + } } static inline void @@ -1523,23 +1575,36 @@ kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie) { struct list_head *ttmp; kra_tx_t *tx; - + unsigned long flags; + + spin_lock_irqsave(&conn->rac_lock, flags); + list_for_each(ttmp, &conn->rac_replyq) { tx = list_entry(ttmp, kra_tx_t, tx_list); - + + CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n", + tx, tx->tx_msg.ram_type, tx->tx_cookie); + if (tx->tx_cookie != cookie) continue; - + if (tx->tx_msg.ram_type != type) { + spin_unlock_irqrestore(&conn->rac_lock, flags); CWARN("Unexpected type %x (%x expected) " "matched reply from "LPX64"\n", tx->tx_msg.ram_type, type, conn->rac_peer->rap_nid); return NULL; } + + list_del(&tx->tx_list); + spin_unlock_irqrestore(&conn->rac_lock, flags); + return tx; } - - CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid); + + spin_unlock_irqrestore(&conn->rac_lock, flags); + CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n", + type, cookie, conn->rac_peer->rap_nid); return NULL; } @@ -1556,12 +1621,19 @@ kranal_check_fma_rx (kra_conn_t *conn) if (rrc == RAP_NOT_DONE) return; - + + CDEBUG(D_NET, "RX on %p\n", conn); + LASSERT (rrc == RAP_SUCCESS); conn->rac_last_rx = jiffies; seq = conn->rac_rx_seq++; msg = (kra_msg_t *)prefix; + /* stash message for portals callbacks they'll NULL + * rac_rxmsg if they consume it */ + LASSERT (conn->rac_rxmsg == NULL); + conn->rac_rxmsg = msg; + if (msg->ram_magic != RANAL_MSG_MAGIC) { if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) { CERROR("Unexpected magic %08x from "LPX64"\n", @@ -1585,7 +1657,7 @@ kranal_check_fma_rx (kra_conn_t *conn) case RANAL_MSG_GET_REQ: kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc); break; - + default: break; } @@ -1602,7 +1674,7 @@ kranal_check_fma_rx (kra_conn_t *conn) msg->ram_srcnid, peer->rap_nid); goto out; } - + if (msg->ram_connstamp != conn->rac_peer_connstamp) { CERROR("Unexpected connstamp "LPX64"("LPX64 " expected) from "LPX64"\n", @@ -1610,7 +1682,7 @@ kranal_check_fma_rx (kra_conn_t *conn) peer->rap_nid); goto out; } - + if (msg->ram_seq != seq) { CERROR("Unexpected sequence number %d(%d expected) from " LPX64"\n", msg->ram_seq, seq, peer->rap_nid); @@ -1624,18 +1696,20 @@ kranal_check_fma_rx (kra_conn_t *conn) } if (conn->rac_close_recvd) { - CERROR("Unexpected message %d after CLOSE from "LPX64"\n", + CERROR("Unexpected message %d after CLOSE from "LPX64"\n", msg->ram_type, conn->rac_peer->rap_nid); goto out; } if (msg->ram_type == RANAL_MSG_CLOSE) { + CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid); conn->rac_close_recvd = 1; write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (conn->rac_state == RANAL_CONN_ESTABLISHED) kranal_close_conn_locked(conn, 0); - else if (conn->rac_close_sent) + else if (conn->rac_state == RANAL_CONN_CLOSING && + conn->rac_close_sent) kranal_terminate_conn_locked(conn); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); @@ -1644,19 +1718,20 @@ kranal_check_fma_rx (kra_conn_t *conn) if (conn->rac_state != RANAL_CONN_ESTABLISHED) goto out; - - conn->rac_rxmsg = msg; /* stash message for portals callbacks */ - /* they'll NULL rac_rxmsg if they consume it */ + switch (msg->ram_type) { case RANAL_MSG_NOOP: /* Nothing to do; just a keepalive */ + CDEBUG(D_NET, "RX NOOP on %p\n", conn); break; - + case RANAL_MSG_IMMEDIATE: + CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn); lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn); break; - + case RANAL_MSG_PUT_REQ: + CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn); lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn); if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ @@ -1665,36 +1740,39 @@ kranal_check_fma_rx (kra_conn_t *conn) tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK); if (tx == NULL) break; - - tx->tx_msg.ram_u.completion.racm_cookie = + + tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.putreq.raprm_cookie; kranal_post_fma(conn, tx); break; case RANAL_MSG_PUT_NAK: + CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; - + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, -ENOENT); /* no match */ break; - + case RANAL_MSG_PUT_ACK: + CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, msg->ram_u.putack.rapam_src_cookie); if (tx == NULL) break; kranal_rdma(tx, RANAL_MSG_PUT_DONE, - &msg->ram_u.putack.rapam_desc, + &msg->ram_u.putack.rapam_desc, msg->ram_u.putack.rapam_desc.rard_nob, msg->ram_u.putack.rapam_dst_cookie); break; case RANAL_MSG_PUT_DONE: + CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK, msg->ram_u.completion.racm_cookie); if (tx == NULL) @@ -1706,8 +1784,9 @@ kranal_check_fma_rx (kra_conn_t *conn) break; case RANAL_MSG_GET_REQ: + CDEBUG(D_NET, "RX GET_REQ on %p\n", conn); lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn); - + if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ break; @@ -1718,24 +1797,26 @@ kranal_check_fma_rx (kra_conn_t *conn) tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie; kranal_post_fma(conn, tx); break; - + case RANAL_MSG_GET_NAK: + CDEBUG(D_NET, "RX GET_NAK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; - + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, -ENOENT); /* no match */ break; - + case RANAL_MSG_GET_DONE: + CDEBUG(D_NET, "RX GET_DONE on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; - + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, 0); @@ -1751,27 +1832,34 @@ kranal_check_fma_rx (kra_conn_t *conn) } void -kranal_complete_closed_conn (kra_conn_t *conn) +kranal_complete_closed_conn (kra_conn_t *conn) { kra_tx_t *tx; + int nfma; + int nreplies; LASSERT (conn->rac_state == RANAL_CONN_CLOSED); + LASSERT (list_empty(&conn->rac_list)); + LASSERT (list_empty(&conn->rac_hashlist)); - while (!list_empty(&conn->rac_fmaq)) { + for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) { tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); - + list_del(&tx->tx_list); kranal_tx_done(tx, -ECONNABORTED); } - + LASSERT (list_empty(&conn->rac_rdmaq)); - while (!list_empty(&conn->rac_replyq)) { + for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) { tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); - + list_del(&tx->tx_list); kranal_tx_done(tx, -ECONNABORTED); } + + CDEBUG(D_WARNING, "Closed conn %p -> "LPX64": nmsg %d nreplies %d\n", + conn, conn->rac_peer->rap_nid, nfma, nreplies); } int @@ -1791,11 +1879,14 @@ kranal_scheduler (void *arg) dev->rad_scheduler = current; init_waitqueue_entry(&wait, current); + /* prevent connd from doing setri until requested */ + down(&dev->rad_setri_mutex); + spin_lock_irqsave(&dev->rad_lock, flags); while (!kranal_data.kra_shutdown) { /* Safe: kra_shutdown only set when quiescent */ - + if (busy_loops++ >= RANAL_RESCHED) { spin_unlock_irqrestore(&dev->rad_lock, flags); @@ -1805,6 +1896,19 @@ kranal_scheduler (void *arg) spin_lock_irqsave(&dev->rad_lock, flags); } + /* Ghastly hack to ensure RapkSetRiParams() serialises with + * other comms */ + if (dev->rad_setri_please != 0) { + spin_unlock_irqrestore(&dev->rad_lock, flags); + up(&dev->rad_setri_mutex); + + wait_event_interruptible(dev->rad_waitq, + dev->rad_setri_please == 0); + + down(&dev->rad_setri_mutex); + spin_lock_irqsave(&dev->rad_lock, flags); + } + if (dev->rad_ready) { /* Device callback fired since I last checked it */ dev->rad_ready = 0; @@ -1815,7 +1919,7 @@ kranal_scheduler (void *arg) spin_lock_irqsave(&dev->rad_lock, flags); } - + if (!list_empty(&dev->rad_connq)) { /* Connection needs attention */ conn = list_entry(dev->rad_connq.next, @@ -1832,11 +1936,15 @@ kranal_scheduler (void *arg) kranal_complete_closed_conn(conn); kranal_conn_decref(conn); - + spin_lock_irqsave(&dev->rad_lock, flags); continue; } + /* recheck device callback fired before sleeping */ + if (dev->rad_ready) + continue; + add_wait_queue(&dev->rad_waitq, &wait); set_current_state(TASK_INTERRUPTIBLE); @@ -1852,6 +1960,7 @@ kranal_scheduler (void *arg) } spin_unlock_irqrestore(&dev->rad_lock, flags); + up(&dev->rad_setri_mutex); dev->rad_scheduler = NULL; kranal_thread_fini();