}
spin_lock_irqsave (&svc->srv_lock, flags);
- list_add(&rqbd->rqbd_list, &srv_ni->sni_rqbds);
+ list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
svc->srv_nbufs++;
spin_unlock_irqrestore (&svc->srv_lock, flags);
(large->tv_usec - small->tv_usec);
}
+static int
+ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
+{
+ struct ptlrpc_srv_ni *srv_ni;
+ struct ptlrpc_request_buffer_desc *rqbd;
+ unsigned long flags;
+ int rc;
+
+ spin_lock_irqsave(&svc->srv_lock, flags);
+ if (list_empty (&svc->srv_idle_rqbds)) {
+ spin_unlock_irqrestore(&svc->srv_lock, flags);
+ return (0);
+ }
+
+ rqbd = list_entry(svc->srv_idle_rqbds.next,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
+ list_del (&rqbd->rqbd_list);
+
+ /* assume we will post successfully */
+ srv_ni = rqbd->rqbd_srv_ni;
+ srv_ni->sni_nrqbd_receiving++;
+ list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
+
+ spin_unlock_irqrestore(&svc->srv_lock, flags);
+
+ rc = ptlrpc_register_rqbd(rqbd);
+ if (rc == 0)
+ return (1);
+
+ spin_lock_irqsave(&svc->srv_lock, flags);
+
+ srv_ni->sni_nrqbd_receiving--;
+ list_del(&rqbd->rqbd_list);
+ list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
+
+ if (srv_ni->sni_nrqbd_receiving == 0) {
+ /* This service is off-air on this interface because all
+ * its request buffers are busy. Portals will have started
+ * dropping incoming requests until more buffers get
+ * posted */
+ CERROR("All %s %s request buffers busy\n",
+ svc->srv_name, srv_ni->sni_ni->pni_name);
+ }
+
+ spin_unlock_irqrestore (&svc->srv_lock, flags);
+
+ return (-1);
+}
+
struct ptlrpc_service *
ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
int req_portal, int rep_portal,
service->srv_handler = handler;
INIT_LIST_HEAD(&service->srv_request_queue);
+ INIT_LIST_HEAD(&service->srv_idle_rqbds);
INIT_LIST_HEAD(&service->srv_reply_queue);
/* First initialise enough for early teardown */
srv_ni->sni_service = service;
srv_ni->sni_ni = &ptlrpc_interfaces[i];
- INIT_LIST_HEAD(&srv_ni->sni_rqbds);
+ INIT_LIST_HEAD(&srv_ni->sni_active_rqbds);
INIT_LIST_HEAD(&srv_ni->sni_active_replies);
}
srv_ni->sni_ni->pni_name);
GOTO(failed, NULL);
}
- ptlrpc_register_rqbd (rqbd);
+
+ /* We shouldn't be under memory pressure at
+ * startup, so fail if we can't post all our
+ * buffers at this time. */
+ if (ptlrpc_server_post_idle_rqbds(service) <= 0)
+ GOTO(failed, NULL);
}
}
return NULL;
}
+static void
+ptlrpc_server_free_request(struct ptlrpc_service *svc, struct ptlrpc_request *req)
+{
+ unsigned long flags;
+ int refcount;
+
+ spin_lock_irqsave(&svc->srv_lock, flags);
+ svc->srv_n_active_reqs--;
+ refcount = --(req->rq_rqbd->rqbd_refcount);
+ if (refcount == 0) {
+ /* request buffer is now idle */
+ list_del(&req->rq_rqbd->rqbd_list);
+ list_add_tail(&req->rq_rqbd->rqbd_list,
+ &svc->srv_idle_rqbds);
+ }
+ spin_unlock_irqrestore(&svc->srv_lock, flags);
+
+ ptlrpc_free_server_req(req);
+}
+
static int
ptlrpc_server_handle_request (struct ptlrpc_service *svc)
{
struct timeval work_start;
struct timeval work_end;
long timediff;
- int refcount;
int rc;
ENTRY;
}
}
- spin_lock_irqsave(&svc->srv_lock, flags);
- svc->srv_n_active_reqs--;
- refcount = --(request->rq_rqbd->rqbd_refcount);
- spin_unlock_irqrestore(&svc->srv_lock, flags);
-
- if (refcount == 0) {
- /* rqbd now idle: repost */
- ptlrpc_register_rqbd(request->rq_rqbd);
- }
-
- ptlrpc_free_server_req(request);
-
+ ptlrpc_server_free_request(svc, request);
+
RETURN(1);
}
liblustre_check_services (void *arg)
{
int did_something = 0;
+ int rc;
struct list_head *tmp, *nxt;
ENTRY;
svc->srv_nthreads++;
- while (ptlrpc_server_handle_reply (svc))
- did_something++;
-
- while (ptlrpc_server_handle_request (svc))
- did_something++;
-
+ do {
+ rc = ptlrpc_server_handle_reply(svc);
+ rc |= ptlrpc_server_handle_request(svc);
+ rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0);
+ did_something |= rc;
+ } while (rc);
+
svc->srv_nthreads--;
}
reparent_to_init();
}
+static int
+ptlrpc_retry_rqbds(void *arg)
+{
+ struct ptlrpc_service *svc = (struct ptlrpc_service *)arg;
+
+ svc->srv_rqbd_timeout = 0;
+ return (-ETIMEDOUT);
+}
+
static int ptlrpc_main(void *arg)
{
struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
while ((thread->t_flags & SVC_STOPPING) == 0 ||
svc->srv_n_difficult_replies != 0) {
/* Don't exit while there are replies to be handled */
- struct l_wait_info lwi = { 0 };
-
+ struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
+ ptlrpc_retry_rqbds, svc);
+
l_wait_event_exclusive (svc->srv_waitq,
- (thread->t_flags & SVC_STOPPING) != 0 ||
+ ((thread->t_flags & SVC_STOPPING) != 0 &&
+ svc->srv_n_difficult_replies == 0) ||
+ (!list_empty(&svc->srv_idle_rqbds) &&
+ svc->srv_rqbd_timeout == 0) ||
!list_empty (&svc->srv_reply_queue) ||
(!list_empty (&svc->srv_request_queue) &&
(svc->srv_n_difficult_replies == 0 ||
(svc->srv_n_difficult_replies == 0 ||
svc->srv_n_active_reqs < (svc->srv_nthreads - 1)))
ptlrpc_server_handle_request (svc);
+
+ if (!list_empty(&svc->srv_idle_rqbds) &&
+ ptlrpc_server_post_idle_rqbds(svc) < 0) {
+ /* I just failed to repost request buffers. Wait
+ * for a timeout (unless something else happens)
+ * before I try again */
+ svc->srv_rqbd_timeout = HZ/10;
+ }
}
spin_lock_irqsave(&svc->srv_lock, flags);
service->srv_name, srv_ni->sni_ni->pni_name);
/* Unlink all the request buffers. This forces a 'final'
- * event with its 'unlink' flag set for each rqbd */
- list_for_each(tmp, &srv_ni->sni_rqbds) {
+ * event with its 'unlink' flag set for each posted rqbd */
+ list_for_each(tmp, &srv_ni->sni_active_rqbds) {
struct ptlrpc_request_buffer_desc *rqbd =
list_entry(tmp, struct ptlrpc_request_buffer_desc,
rqbd_list);
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
- req->rq_rqbd->rqbd_refcount--;
-
- ptlrpc_free_server_req(req);
+ service->srv_n_active_reqs++;
+
+ ptlrpc_server_free_request(service, req);
}
LASSERT(service->srv_n_queued_reqs == 0);
+ LASSERT(service->srv_n_active_reqs == 0);
- /* Now free all the request buffers since nothing references them
- * any more... */
for (i = 0; i < ptlrpc_ninterfaces; i++) {
srv_ni = &service->srv_interfaces[i];
+ LASSERT(list_empty(&srv_ni->sni_active_rqbds));
+ }
- while (!list_empty(&srv_ni->sni_rqbds)) {
- struct ptlrpc_request_buffer_desc *rqbd =
- list_entry(srv_ni->sni_rqbds.next,
- struct ptlrpc_request_buffer_desc,
- rqbd_list);
+ /* Now free all the request buffers since nothing references them
+ * any more... */
+ while (!list_empty(&service->srv_idle_rqbds)) {
+ struct ptlrpc_request_buffer_desc *rqbd =
+ list_entry(service->srv_idle_rqbds.next,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
- ptlrpc_free_rqbd(rqbd);
- }
+ ptlrpc_free_rqbd(rqbd);
}
/* wait for all outstanding replies to complete (they were