struct ptlrpc_thread *thread, ptl_event_t *event)
{
struct ptlrpc_srv_ni *srv_ni;
- int i;
- int idx;
- int rc;
+ int i, idx, rc;
ENTRY;
spin_lock(&svc->srv_lock);
case PTL_EQ_EMPTY:
continue;
+ case PTL_EQ_DROPPED:
+ CWARN("Event queue overflow (bug 2125): timeouts will "
+ "follow.\n");
+ continue;
+
default:
CERROR("BUG: PtlEQGet returned %d\n", rc);
LBUG();
__u32 bufsize, __u32 max_req_size,
int req_portal, int rep_portal,
svc_handler_t handler, char *name,
- struct obd_device *obddev)
+ struct proc_dir_entry *proc_entry)
{
int i, j, ssize, rc;
struct ptlrpc_service *service;
for (j = 0; j < nbufs; j++) {
struct ptlrpc_request_buffer_desc *rqbd;
- OBD_ALLOC(rqbd, sizeof(*rqbd));
+ OBD_ALLOC_WAIT(rqbd, sizeof(*rqbd));
if (rqbd == NULL) {
CERROR ("%s.%d: Can't allocate request "
"descriptor %d on %s\n",
rqbd->rqbd_me_h = PTL_HANDLE_NONE;
atomic_set(&rqbd->rqbd_refcount, 0);
- OBD_ALLOC(rqbd->rqbd_buffer, service->srv_buf_size);
+ OBD_ALLOC_WAIT(rqbd->rqbd_buffer, service->srv_buf_size);
if (rqbd->rqbd_buffer == NULL) {
CERROR ("%s.%d: Can't allocate request "
"buffer %d on %s\n",
}
}
- ptlrpc_lprocfs_register_service(obddev, service);
+ if (proc_entry != NULL)
+ ptlrpc_lprocfs_register_service(proc_entry, service);
CDEBUG(D_NET, "%s: Started on %d interfaces, listening on portal %d\n",
service->srv_name, ptlrpc_ninterfaces, service->srv_req_portal);
if (request->rq_export) {
request->rq_connection = request->rq_export->exp_connection;
ptlrpc_connection_addref(request->rq_connection);
+ if (request->rq_reqmsg->conn_cnt <
+ request->rq_export->exp_conn_cnt) {
+ DEBUG_REQ(D_ERROR, request,
+ "DROPPING req from old connection %d < %d",
+ request->rq_reqmsg->conn_cnt,
+ request->rq_export->exp_conn_cnt);
+ goto put_conn;
+ }
+
request->rq_export->exp_last_request_time =
LTIME_S(CURRENT_TIME);
} else {
ptlrpc_get_connection(&request->rq_peer, NULL);
}
- CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid:pid:xid:ni:nid:opc %s:%s:%d:"
- LPU64":%s:"LPX64":%d\n",
- current->comm,
- (request->rq_export ?
- (char *)request->rq_export->exp_client_uuid.uuid : "0"),
+ CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
+ "%s:%s+%d:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+ (request->rq_export ?
+ (char *)request->rq_export->exp_client_uuid.uuid : "0"),
+ (request->rq_export ?
+ atomic_read(&request->rq_export->exp_refcount) : -99),
request->rq_reqmsg->status, request->rq_xid,
rqbd->rqbd_srv_ni->sni_ni->pni_name, event->initiator.nid,
request->rq_reqmsg->opc);
rc = svc->srv_handler(request);
- CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid:pid:xid:ni:nid:opc %s:%s:%d:"
- LPU64":%s:"LPX64":%d\n",
- current->comm,
- (request->rq_export ?
+ CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
+ "%s:%s+%d:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+ (request->rq_export ?
(char *)request->rq_export->exp_client_uuid.uuid : "0"),
+ (request->rq_export ?
+ atomic_read(&request->rq_export->exp_refcount) : -99),
request->rq_reqmsg->status, request->rq_xid,
rqbd->rqbd_srv_ni->sni_ni->pni_name, event->initiator.nid,
request->rq_reqmsg->opc);
+put_conn:
ptlrpc_put_connection(request->rq_connection);
if (request->rq_export != NULL)
class_export_put(request->rq_export);
void ptlrpc_daemonize(void)
{
exit_mm(current);
-
- current->session = 1;
- current->pgrp = 1;
- current->tty = NULL;
-
+ lustre_daemonize_helper();
exit_files(current);
reparent_to_init();
}
/* And now, loop forever on requests */
while (1) {
struct l_wait_info lwi = { 0 };
- l_wait_event(svc->srv_waitq,
- ptlrpc_check_event(svc, thread, event), &lwi);
+ l_wait_event_exclusive(svc->srv_waitq,
+ ptlrpc_check_event(svc, thread, event),
+ &lwi);
spin_lock(&svc->srv_lock);
if (thread->t_flags & SVC_STOPPING) {
thread->t_flags = SVC_STOPPING;
spin_unlock(&svc->srv_lock);
- wake_up(&svc->srv_waitq);
+ wake_up_all(&svc->srv_waitq);
l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
&lwi);
}
spin_unlock(&svc->srv_lock);
}
+int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
+ int num_threads, char *base_name)
+{
+ int i, rc = 0;
+ ENTRY;
+
+ for (i = 0; i < num_threads; i++) {
+ char name[32];
+ sprintf(name, "%s_%02d", base_name, i);
+ rc = ptlrpc_start_thread(dev, svc, name);
+ if (rc) {
+ CERROR("cannot start %s thread #%d: rc %d\n", base_name,
+ i, rc);
+ ptlrpc_stop_all_threads(svc);
+ }
+ }
+ RETURN(rc);
+}
+
int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
char *name)
{