#include <linux/lustre_ha.h>
#include <linux/lustre_import.h>
-/* default rpc ring length */
-//#define RPC_RING_LENGTH 10
-#define RPC_REQUEST_QUEUE_DEPTH 1024
+/* The following constants determine how much memory is devoted to
+ * buffering in the lustre services.
+ *
+ * ?_NEVENTS # event queue entries
+ *
+ * ?_NBUFS # request buffers
+ * ?_BUFSIZE # bytes in a single request buffer
+ * total memory = ?_NBUFS * ?_BUFSIZE
+ *
+ * ?_MAXREQSIZE # maximum request service will receive
+ * larger messages will get dropped.
+ * request buffers are auto-unlinked when less than ?_MAXREQSIZE
+ * is left in them.
+ */
+
+#define LDLM_NEVENTS 1024
+#define LDLM_NBUFS 10
+#define LDLM_BUFSIZE (64 * 1024)
+#define LDLM_MAXREQSIZE 1024
+
+#define MDS_NEVENTS 1024
+#define MDS_NBUFS 10
+#define MDS_BUFSIZE (64 * 1024)
+#define MDS_MAXREQSIZE 1024
+
+#ifdef __arch_um__
+#define OST_NEVENTS 1024
+#define OST_NBUFS 10
+#define OST_BUFSIZE (64 * 1024)
+#define OST_MAXREQSIZE (8 * 1024)
+#else
+#define OST_NEVENTS 4096
+#define OST_NBUFS 40
+#define OST_BUFSIZE (128 * 1024)
+#define OST_MAXREQSIZE (8 * 1024)
+#endif
struct ptlrpc_connection {
struct list_head c_link;
};
struct ptlrpc_request_buffer_desc {
+ struct list_head rqbd_list;
struct ptlrpc_service *rqbd_service;
ptl_handle_me_t rqbd_me_h;
+ atomic_t rqbd_refcount;
char *rqbd_buffer;
};
/* incoming request buffers */
/* FIXME: perhaps a list of EQs, if multiple NIs are used? */
- struct ptlrpc_request_buffer_desc *srv_rqbds; /* all the request buffer descriptors */
+ __u32 srv_max_req_size; /* biggest request to receive */
+ __u32 srv_buf_size; /* # bytes in a request buffer */
+ struct list_head srv_rqbds; /* all the request buffer descriptors */
+ __u32 srv_nrqbds; /* # request buffers */
+ atomic_t srv_nrqbds_receiving; /* # request buffers posted for input */
- __u32 srv_buf_size; /* # bytes in a request buffer */
- __u32 srv_nbuffs; /* # request buffers */
__u32 srv_req_portal;
__u32 srv_rep_portal;
wait_queue_head_t srv_waitq; /* all threads sleep on this */
spinlock_t srv_lock;
- struct list_head srv_reqs;
struct list_head srv_threads;
int (*srv_handler)(struct ptlrpc_request *req);
char *srv_name; /* only statically allocated strings here; we don't clean them */
/* rpc/service.c */
struct ptlrpc_service *
-ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal,
+ptlrpc_init_svc(__u32 nevents, __u32 nbufs, __u32 bufsize, __u32 max_req_size,
+ int req_portal, int rep_portal,
obd_uuid_t uuid, svc_handler_t, char *name);
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
}
struct ptlrpc_service *
-ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal,
+ptlrpc_init_svc(__u32 nevents, __u32 nbufs,
+ __u32 bufsize, __u32 max_req_size,
+ int req_portal, int rep_portal,
obd_uuid_t uuid, svc_handler_t handler, char *name)
{
int err;
service->srv_name = name;
spin_lock_init(&service->srv_lock);
- INIT_LIST_HEAD(&service->srv_reqs);
INIT_LIST_HEAD(&service->srv_threads);
init_waitqueue_head(&service->srv_waitq);
+ service->srv_max_req_size = max_req_size;
service->srv_buf_size = bufsize;
- service->srv_nbuffs = nbuffs;
+ INIT_LIST_HEAD (&service->srv_rqbds);
+ service->srv_nrqbds = 0;
+ atomic_set (&service->srv_nrqbds_receiving, 0);
+
service->srv_rep_portal = rep_portal;
service->srv_req_portal = req_portal;
service->srv_handler = handler;
RETURN(NULL);
}
- /* NB We need exactly 1 event for each buffer we queue */
- rc = PtlEQAlloc(service->srv_self.peer_ni, service->srv_nbuffs,
+ rc = PtlEQAlloc(service->srv_self.peer_ni, nevents,
request_in_callback, &(service->srv_eq_h));
if (rc != PTL_OK) {
CERROR("PtlEQAlloc failed: %d\n", rc);
- LBUG();
OBD_FREE(service, sizeof(*service));
RETURN(NULL);
}
- OBD_ALLOC(service->srv_rqbds, service->srv_nbuffs *
- sizeof(struct ptlrpc_request_buffer_desc));
- if (service->srv_rqbds == NULL) {
- CERROR("no memory\n");
- LBUG();
- GOTO(failed, NULL);
- }
+ for (i = 0; i < nbufs; i++) {
+ struct ptlrpc_request_buffer_desc *rqbd;
- for (i = 0; i < service->srv_nbuffs; i++) {
- struct ptlrpc_request_buffer_desc *rqbd =&service->srv_rqbds[i];
+ OBD_ALLOC (rqbd, sizeof (*rqbd));
+ if (rqbd == NULL)
+ {
+ CERROR ("no memory\n");
+ GOTO (failed, NULL);
+ }
rqbd->rqbd_service = service;
ptl_set_inv_handle (&rqbd->rqbd_me_h);
+ atomic_set (&rqbd->rqbd_refcount, 0);
OBD_ALLOC(rqbd->rqbd_buffer, service->srv_buf_size);
if (rqbd->rqbd_buffer == NULL) {
+ OBD_FREE (rqbd, sizeof (*rqbd));
CERROR("no memory\n");
- LBUG();
GOTO(failed, NULL);
}
+ list_add (&rqbd->rqbd_list, &service->srv_rqbds);
+ service->srv_nrqbds++;
+
ptlrpc_link_svc_me(rqbd);
}
/* FIXME: If we move to an event-driven model, we should put the request
* on the stack of mds_handle instead. */
+
+ LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0);
LASSERT ((event->mem_desc.options & PTL_MD_IOV) == 0);
LASSERT (rqbd->rqbd_service == svc);
LASSERT (rqbd->rqbd_buffer == event->mem_desc.start);
- LASSERT (event->offset == 0);
+ LASSERT (event->offset + event->mlength <= svc->srv_buf_size);
memset(request, 0, sizeof(*request));
request->rq_svc = svc;
request->rq_obd = obddev;
request->rq_xid = event->match_bits;
request->rq_reqmsg = event->mem_desc.start + event->offset;
- request->rq_reqlen = event->mem_desc.length;
+ request->rq_reqlen = event->mlength;
+
+ rc = -EINVAL;
if (request->rq_reqlen < sizeof(struct lustre_msg)) {
CERROR("incomplete request (%d): ptl %d from "LPX64" xid "LPD64"\n",
request->rq_reqlen, svc->srv_req_portal,
event->initiator.nid, request->rq_xid);
- spin_unlock(&svc->srv_lock);
- RETURN(-EINVAL);
+ goto out;
}
if (NTOH__u32(request->rq_reqmsg->type) != PTL_RPC_MSG_REQUEST) {
CERROR("wrong packet type received (type=%u)\n",
request->rq_reqmsg->type);
- LBUG();
- spin_unlock(&svc->srv_lock);
- RETURN(-EINVAL);
+ goto out;
}
if (request->rq_reqmsg->magic != PTLRPC_MSG_MAGIC) {
CERROR("wrong lustre_msg magic %d: ptl %d from "LPX64" xid "LPD64"\n",
request->rq_reqmsg->magic, svc->srv_req_portal,
event->initiator.nid, request->rq_xid);
- spin_unlock(&svc->srv_lock);
- RETURN(-EINVAL);
+ goto out;
}
if (request->rq_reqmsg->version != PTLRPC_MSG_VERSION) {
CERROR("wrong lustre_msg version %d: ptl %d from "LPX64" xid "LPD64"\n",
request->rq_reqmsg->version, svc->srv_req_portal,
event->initiator.nid, request->rq_xid);
- spin_unlock(&svc->srv_lock);
- RETURN(-EINVAL);
+ goto out;
}
CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
ptlrpc_connection_addref(request->rq_connection);
}
- spin_unlock(&svc->srv_lock);
-
rc = svc->srv_handler(request);
ptlrpc_put_connection(request->rq_connection);
- ptlrpc_link_svc_me (rqbd);
+ out:
+ if (atomic_dec_and_test (&rqbd->rqbd_refcount)) /* last reference? */
+ ptlrpc_link_svc_me (rqbd);
+
return rc;
}
wait_event(svc->srv_waitq,
ptlrpc_check_event(svc, thread, event));
- spin_lock(&svc->srv_lock);
-
if (thread->t_flags & SVC_STOPPING) {
+ spin_lock(&svc->srv_lock);
thread->t_flags &= ~SVC_STOPPING;
spin_unlock(&svc->srv_lock);
+
EXIT;
break;
}
if (thread->t_flags & SVC_EVENT) {
- LASSERT (event->sequence != 0);
+ spin_lock(&svc->srv_lock);
+ thread->t_flags &= ~SVC_EVENT;
+ spin_unlock(&svc->srv_lock);
+
rc = handle_incoming_request(obddev, svc, event,
request);
- thread->t_flags &= ~SVC_EVENT;
continue;
}
CERROR("unknown break in service");
- spin_unlock(&svc->srv_lock);
+ LBUG();
EXIT;
break;
}
int ptlrpc_unregister_service(struct ptlrpc_service *service)
{
- int rc, i;
-
- /* NB service->srv_nbuffs gets set before we attempt (and possibly
- * fail) to allocate srv_rqbds.
- */
- if (service->srv_rqbds != NULL) {
- for (i = 0; i < service->srv_nbuffs; i++) {
- struct ptlrpc_request_buffer_desc *rqbd =
- &service->srv_rqbds[i];
-
- if (rqbd->rqbd_buffer == NULL) /* no buffer allocated */
- continue; /* => never initialised */
-
- /* Buffer allocated => got linked */
- LASSERT (ptl_is_valid_handle (&rqbd->rqbd_me_h));
+ int rc;
- rc = PtlMEUnlink(rqbd->rqbd_me_h);
- if (rc)
- CERROR("PtlMEUnlink failed: %d\n", rc);
+ LASSERT (list_empty (&service->srv_threads));
- OBD_FREE(rqbd->rqbd_buffer, service->srv_buf_size);
- }
-
- OBD_FREE(service->srv_rqbds, service->srv_nbuffs *
- sizeof (struct ptlrpc_request_buffer_desc));
+ /* XXX We could reply (with failure) to all buffered requests
+ * _after_ unlinking _all_ the request buffers, but _before_
+ * freeing them.
+ */
+
+ while (!list_empty (&service->srv_rqbds))
+ {
+ struct ptlrpc_request_buffer_desc *rqbd =
+ list_entry (service->srv_rqbds.next,
+ struct ptlrpc_request_buffer_desc,
+ rqbd_list);
+
+ list_del (&rqbd->rqbd_list);
+
+ LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0);
+ /* refcount could be anything; it's possible for the
+ * buffers to continued to get filled after all the server
+ * threads exited. But we know they _have_ exited.
+ */
+
+ (void) PtlMEUnlink(rqbd->rqbd_me_h);
+ /* The callback handler could have unlinked this ME already
+ * (we're racing with her) but it's safe to ensure it _has_
+ * been unlinked.
+ */
+
+ OBD_FREE (rqbd->rqbd_buffer, service->srv_buf_size);
+ OBD_FREE (rqbd, sizeof (*rqbd));
+ service->srv_nrqbds--;
}
+ LASSERT (service->srv_nrqbds == 0);
+
rc = PtlEQFree(service->srv_eq_h);
if (rc)
CERROR("PtlEQFree failed: %d\n", rc);
- if (!list_empty(&service->srv_reqs)) {
- // XXX reply with errors and clean up
- CERROR("Request list not empty!\n");
- rc = -EBUSY;
- }
-
OBD_FREE(service, sizeof(*service));
if (rc)
LBUG();