#include <portals/types.h>
#include "ptlrpc_internal.h"
+/* forward ref */
+static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);
+
+
static LIST_HEAD (ptlrpc_all_services);
static spinlock_t ptlrpc_all_services_lock = SPIN_LOCK_UNLOCKED;
OBD_FREE (rqbd, sizeof (*rqbd));
}
+int
+ptlrpc_grow_req_bufs(struct ptlrpc_srv_ni *srv_ni)
+{
+ struct ptlrpc_service *svc = srv_ni->sni_service;
+ struct ptlrpc_request_buffer_desc *rqbd;
+ int i;
+
+ for (i = 0; i < svc->srv_nbuf_per_group; i++) {
+ rqbd = ptlrpc_alloc_rqbd(srv_ni);
+
+ if (rqbd == NULL) {
+ CERROR ("%s/%s: Can't allocate request buffer\n",
+ svc->srv_name, srv_ni->sni_ni->pni_name);
+ return (-ENOMEM);
+ }
+
+ if (ptlrpc_server_post_idle_rqbds(svc) < 0)
+ return (-EAGAIN);
+ }
+
+ return (0);
+}
+
void
ptlrpc_save_llog_lock(struct ptlrpc_request *req, struct llog_create_locks *lcl)
{
struct ptlrpc_request_buffer_desc *rqbd;
unsigned long flags;
int rc;
+ int posted = 0;
- spin_lock_irqsave(&svc->srv_lock, flags);
- if (list_empty (&svc->srv_idle_rqbds)) {
+ for (;;) {
+ spin_lock_irqsave(&svc->srv_lock, flags);
+ if (list_empty (&svc->srv_idle_rqbds)) {
+ spin_unlock_irqrestore(&svc->srv_lock, flags);
+ return (posted);
+ }
+ rqbd = list_entry(svc->srv_idle_rqbds.next,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
+ list_del (&rqbd->rqbd_list);
+
+ /* assume we will post successfully */
+ srv_ni = rqbd->rqbd_srv_ni;
+ srv_ni->sni_nrqbd_receiving++;
+ list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
spin_unlock_irqrestore(&svc->srv_lock, flags);
- return (0);
- }
-
- rqbd = list_entry(svc->srv_idle_rqbds.next,
- struct ptlrpc_request_buffer_desc,
- rqbd_list);
- list_del (&rqbd->rqbd_list);
- /* assume we will post successfully */
- srv_ni = rqbd->rqbd_srv_ni;
- srv_ni->sni_nrqbd_receiving++;
- list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
+ rc = ptlrpc_register_rqbd(rqbd);
+ if (rc != 0)
+ break;
- spin_unlock_irqrestore(&svc->srv_lock, flags);
+ posted = 1;
+ }
- rc = ptlrpc_register_rqbd(rqbd);
- if (rc == 0)
- return (1);
spin_lock_irqsave(&svc->srv_lock, flags);
struct proc_dir_entry *proc_entry)
{
int i;
- int j;
+ int rc;
int ssize;
struct ptlrpc_service *service;
struct ptlrpc_srv_ni *srv_ni;
- struct ptlrpc_request_buffer_desc *rqbd;
ENTRY;
LASSERT (ptlrpc_ninterfaces > 0);
INIT_LIST_HEAD(&service->srv_threads);
init_waitqueue_head(&service->srv_waitq);
+ service->srv_nbuf_per_group = nbufs;
service->srv_max_req_size = max_req_size;
service->srv_buf_size = bufsize;
service->srv_rep_portal = rep_portal;
CDEBUG (D_NET, "%s: initialising interface %s\n", name,
srv_ni->sni_ni->pni_name);
- for (j = 0; j < nbufs; j++) {
- rqbd = ptlrpc_alloc_rqbd (srv_ni);
-
- if (rqbd == NULL) {
- CERROR ("%s.%d: Can't allocate request %d "
- "on %s\n", name, i, j,
- srv_ni->sni_ni->pni_name);
- GOTO(failed, NULL);
- }
-
- /* We shouldn't be under memory pressure at
- * startup, so fail if we can't post all our
- * buffers at this time. */
- if (ptlrpc_server_post_idle_rqbds(service) <= 0)
- GOTO(failed, NULL);
- }
+ rc = ptlrpc_grow_req_bufs(srv_ni);
+ /* We shouldn't be under memory pressure at startup, so
+ * fail if we can't post all our buffers at this time. */
+ if (rc != 0)
+ GOTO(failed, NULL);
}
if (proc_entry != NULL)
ptlrpc_free_server_req(req);
}
+static char str[PTL_NALFMT_SIZE];
static int
ptlrpc_server_handle_request (struct ptlrpc_service *svc)
{
struct timeval work_end;
long timediff;
int rc;
- char str[PTL_NALFMT_SIZE];
ENTRY;
spin_lock_irqsave (&svc->srv_lock, flags);
* client's timeout is similar to mine, she'll be timing out this
* REQ anyway (bug 1502) */
if (timediff / 1000000 > (long)obd_timeout) {
- CERROR("Dropping timed-out request from %s: %ld seconds old\n",
- ptlrpc_peernid2str(&request->rq_peer, str),
+ CERROR("Dropping timed-out opc %d request from %s"
+ ": %ld seconds old\n", request->rq_reqmsg->opc,
+ ptlrpc_peernid2str(&request->rq_peer, str),
timediff / 1000000);
goto out;
}
request->rq_peer.peer_ni->pni_name,
ptlrpc_peernid2str(&request->rq_peer, str),
request->rq_reqmsg->opc);
-
+ request->rq_svc = svc;
rc = svc->srv_handler(request);
+ request->rq_svc = NULL;
CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
"%s:%s+%d:%d:"LPU64":%s:%s:%d\n", current->comm,
(request->rq_export ?
reparent_to_init();
}
+static void
+ptlrpc_check_rqbd_pools(struct ptlrpc_service *svc)
+{
+ struct ptlrpc_srv_ni *sni;
+ int i;
+ int avail = 0;
+ int low_water = svc->srv_nbuf_per_group/2;
+
+ for (i = 0; i < ptlrpc_ninterfaces; i++) {
+ sni = &svc->srv_interfaces[i];
+
+ avail += sni->sni_nrqbd_receiving;
+ /* NB I'm not locking; just looking. */
+ if (sni->sni_nrqbd_receiving <= low_water)
+ ptlrpc_grow_req_bufs(sni);
+ }
+
+ lprocfs_counter_add(svc->srv_stats, PTLRPC_REQBUF_AVAIL_CNTR, avail);
+}
+
static int
ptlrpc_retry_rqbds(void *arg)
{
svc->srv_n_active_reqs <
(svc->srv_nthreads - 1))),
&lwi);
-
+
+ ptlrpc_check_rqbd_pools(svc);
+
if (!list_empty (&svc->srv_reply_queue))
ptlrpc_server_handle_reply (svc);