Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 22ccb09..526b35c 100644 (file)
@@ -37,9 +37,7 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc,
                               struct ptlrpc_thread *thread, ptl_event_t *event)
 {
         struct ptlrpc_srv_ni *srv_ni;
-        int i;
-        int idx;
-        int rc;
+        int i, idx, rc;
         ENTRY;
 
         spin_lock(&svc->srv_lock);
@@ -67,6 +65,11 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc,
                 case PTL_EQ_EMPTY:
                         continue;
 
+                case PTL_EQ_DROPPED:
+                        CWARN("Event queue overflow (bug 2125): timeouts will "
+                              "follow.\n");
+                        continue;
+
                 default:
                         CERROR("BUG: PtlEQGet returned %d\n", rc);
                         LBUG();
@@ -83,7 +86,7 @@ struct ptlrpc_service * ptlrpc_init_svc(__u32 nevents, __u32 nbufs,
                                         __u32 bufsize, __u32 max_req_size,
                                         int req_portal, int rep_portal,
                                         svc_handler_t handler, char *name,
-                                        struct obd_device *obddev)
+                                        struct proc_dir_entry *proc_entry)
 {
         int i, j, ssize, rc;
         struct ptlrpc_service *service;
@@ -141,7 +144,7 @@ struct ptlrpc_service * ptlrpc_init_svc(__u32 nevents, __u32 nbufs,
                 for (j = 0; j < nbufs; j++) {
                         struct ptlrpc_request_buffer_desc *rqbd;
 
-                        OBD_ALLOC(rqbd, sizeof(*rqbd));
+                        OBD_ALLOC_WAIT(rqbd, sizeof(*rqbd));
                         if (rqbd == NULL) {
                                 CERROR ("%s.%d: Can't allocate request "
                                         "descriptor %d on %s\n",
@@ -154,7 +157,7 @@ struct ptlrpc_service * ptlrpc_init_svc(__u32 nevents, __u32 nbufs,
                         rqbd->rqbd_me_h = PTL_HANDLE_NONE;
                         atomic_set(&rqbd->rqbd_refcount, 0);
 
-                        OBD_ALLOC(rqbd->rqbd_buffer, service->srv_buf_size);
+                        OBD_ALLOC_WAIT(rqbd->rqbd_buffer, service->srv_buf_size);
                         if (rqbd->rqbd_buffer == NULL) {
                                 CERROR ("%s.%d: Can't allocate request "
                                         "buffer %d on %s\n",
@@ -170,7 +173,8 @@ struct ptlrpc_service * ptlrpc_init_svc(__u32 nevents, __u32 nbufs,
                 }
         }
 
-        ptlrpc_lprocfs_register_service(obddev, service);
+        if (proc_entry != NULL)
+                ptlrpc_lprocfs_register_service(proc_entry, service);
 
         CDEBUG(D_NET, "%s: Started on %d interfaces, listening on portal %d\n",
                service->srv_name, ptlrpc_ninterfaces, service->srv_req_portal);
@@ -236,6 +240,15 @@ static int handle_incoming_request(struct obd_device *obddev,
         if (request->rq_export) {
                 request->rq_connection = request->rq_export->exp_connection;
                 ptlrpc_connection_addref(request->rq_connection);
+                if (request->rq_reqmsg->conn_cnt < 
+                    request->rq_export->exp_conn_cnt) {
+                        DEBUG_REQ(D_ERROR, request,
+                                  "DROPPING req from old connection %d < %d",
+                                  request->rq_reqmsg->conn_cnt,
+                                  request->rq_export->exp_conn_cnt);
+                        goto put_conn;
+                }
+
                 request->rq_export->exp_last_request_time =
                         LTIME_S(CURRENT_TIME);
         } else {
@@ -246,25 +259,28 @@ static int handle_incoming_request(struct obd_device *obddev,
                         ptlrpc_get_connection(&request->rq_peer, NULL);
         }
 
-        CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid:pid:xid:ni:nid:opc %s:%s:%d:"
-               LPU64":%s:"LPX64":%d\n",
-               current->comm,
-               (request->rq_export ? 
-                (char *)request->rq_export->exp_client_uuid.uuid : "0"), 
+        CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
+               "%s:%s+%d:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+               (request->rq_export ?
+                (char *)request->rq_export->exp_client_uuid.uuid : "0"),
+               (request->rq_export ?
+                atomic_read(&request->rq_export->exp_refcount) : -99),
                request->rq_reqmsg->status, request->rq_xid,
                rqbd->rqbd_srv_ni->sni_ni->pni_name, event->initiator.nid,
                request->rq_reqmsg->opc);
 
         rc = svc->srv_handler(request);
-        CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid:pid:xid:ni:nid:opc %s:%s:%d:"
-               LPU64":%s:"LPX64":%d\n",
-               current->comm,
-               (request->rq_export ? 
+        CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
+               "%s:%s+%d:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+               (request->rq_export ?
                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
+               (request->rq_export ?
+                atomic_read(&request->rq_export->exp_refcount) : -99),
                request->rq_reqmsg->status, request->rq_xid,
                rqbd->rqbd_srv_ni->sni_ni->pni_name, event->initiator.nid,
                request->rq_reqmsg->opc);
 
+put_conn:
         ptlrpc_put_connection(request->rq_connection);
         if (request->rq_export != NULL)
                 class_export_put(request->rq_export);
@@ -280,11 +296,7 @@ static int handle_incoming_request(struct obd_device *obddev,
 void ptlrpc_daemonize(void)
 {
         exit_mm(current);
-
-        current->session = 1;
-        current->pgrp = 1;
-        current->tty = NULL;
-
+        lustre_daemonize_helper();
         exit_files(current);
         reparent_to_init();
 }
@@ -337,8 +349,9 @@ static int ptlrpc_main(void *arg)
         /* And now, loop forever on requests */
         while (1) {
                 struct l_wait_info lwi = { 0 };
-                l_wait_event(svc->srv_waitq,
-                             ptlrpc_check_event(svc, thread, event), &lwi);
+                l_wait_event_exclusive(svc->srv_waitq,
+                                       ptlrpc_check_event(svc, thread, event),
+                                       &lwi);
 
                 spin_lock(&svc->srv_lock);
                 if (thread->t_flags & SVC_STOPPING) {
@@ -428,7 +441,7 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
         thread->t_flags = SVC_STOPPING;
         spin_unlock(&svc->srv_lock);
 
-        wake_up(&svc->srv_waitq);
+        wake_up_all(&svc->srv_waitq);
         l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
                      &lwi);
 }
@@ -449,6 +462,25 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
         spin_unlock(&svc->srv_lock);
 }
 
+int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
+                           int num_threads, char *base_name)
+{
+        int i, rc = 0;
+        ENTRY;
+
+        for (i = 0; i < num_threads; i++) {
+                char name[32];
+                sprintf(name, "%s_%02d", base_name, i);
+                rc = ptlrpc_start_thread(dev, svc, name);
+                if (rc) {
+                        CERROR("cannot start %s thread #%d: rc %d\n", base_name,
+                               i, rc);
+                        ptlrpc_stop_all_threads(svc);
+                }
+        }
+        RETURN(rc);
+}
+
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
                         char *name)
 {