Whamcloud - gitweb
LU-292 srv_n_queued_reqs is screwed up by ptlrpc_unregister_service
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 2768325..0513730 100644 (file)
@@ -68,28 +68,6 @@ static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);
 static CFS_LIST_HEAD(ptlrpc_all_services);
 cfs_spinlock_t ptlrpc_all_services_lock;
 
-static char *
-ptlrpc_alloc_request_buffer (int size)
-{
-        char *ptr;
-
-        if (size > SVC_BUF_VMALLOC_THRESHOLD)
-                OBD_VMALLOC(ptr, size);
-        else
-                OBD_ALLOC(ptr, size);
-
-        return (ptr);
-}
-
-static void
-ptlrpc_free_request_buffer (char *ptr, int size)
-{
-        if (size > SVC_BUF_VMALLOC_THRESHOLD)
-                OBD_VFREE(ptr, size);
-        else
-                OBD_FREE(ptr, size);
-}
-
 struct ptlrpc_request_buffer_desc *
 ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
 {
@@ -104,7 +82,7 @@ ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
         rqbd->rqbd_cbid.cbid_fn = request_in_callback;
         rqbd->rqbd_cbid.cbid_arg = rqbd;
         CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
-        rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);
+        OBD_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
 
         if (rqbd->rqbd_buffer == NULL) {
                 OBD_FREE_PTR(rqbd);
@@ -132,7 +110,7 @@ ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
         svc->srv_nbufs--;
         cfs_spin_unlock(&svc->srv_lock);
 
-        ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);
+        OBD_FREE_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
         OBD_FREE_PTR(rqbd);
 }
 
@@ -643,32 +621,6 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req)
 }
 
 /**
- * increment the number of active requests consuming service threads.
- */
-void ptlrpc_server_active_request_inc(struct ptlrpc_request *req)
-{
-        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
-        struct ptlrpc_service *svc = rqbd->rqbd_service;
-
-        cfs_spin_lock(&svc->srv_rq_lock);
-        svc->srv_n_active_reqs++;
-        cfs_spin_unlock(&svc->srv_rq_lock);
-}
-
-/**
- * decrement the number of active requests consuming service threads.
- */
-void ptlrpc_server_active_request_dec(struct ptlrpc_request *req)
-{
-        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
-        struct ptlrpc_service *svc = rqbd->rqbd_service;
-
-        cfs_spin_lock(&svc->srv_rq_lock);
-        svc->srv_n_active_reqs--;
-        cfs_spin_unlock(&svc->srv_rq_lock);
-}
-
-/**
  * drop a reference count of the request. if it reaches 0, we either
  * put it into history list, or free it immediately.
  */
@@ -879,6 +831,8 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
  */
 static int ptlrpc_check_req(struct ptlrpc_request *req)
 {
+        int rc = 0;
+
         if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
                      req->rq_export->exp_conn_cnt)) {
                 DEBUG_REQ(D_ERROR, req,
@@ -893,12 +847,28 @@ static int ptlrpc_check_req(struct ptlrpc_request *req)
                 error response instead. */
                 CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n",
                        req, req->rq_export->exp_obd->obd_name);
-                req->rq_status = -ENODEV;
+                rc = -ENODEV;
+        } else if (lustre_msg_get_flags(req->rq_reqmsg) &
+                   (MSG_REPLAY | MSG_REQ_REPLAY_DONE) &&
+                   !(req->rq_export->exp_obd->obd_recovering)) {
+                        DEBUG_REQ(D_ERROR, req,
+                                  "Invalid replay without recovery");
+                        class_fail_export(req->rq_export);
+                        rc = -ENODEV;
+        } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0 &&
+                   !(req->rq_export->exp_obd->obd_recovering)) {
+                        DEBUG_REQ(D_ERROR, req, "Invalid req with transno "
+                                  LPU64" without recovery",
+                                  lustre_msg_get_transno(req->rq_reqmsg));
+                        class_fail_export(req->rq_export);
+                        rc = -ENODEV;
+        }
+
+        if (unlikely(rc < 0)) {
+                req->rq_status = rc;
                 ptlrpc_error(req);
-                return -ENODEV;
         }
-
-        return 0;
+        return rc;
 }
 
 static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
@@ -1054,7 +1024,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         OBD_ALLOC(reqcopy, sizeof *reqcopy);
         if (reqcopy == NULL)
                 RETURN(-ENOMEM);
-        OBD_ALLOC(reqmsg, req->rq_reqlen);
+        OBD_ALLOC_LARGE(reqmsg, req->rq_reqlen);
         if (!reqmsg) {
                 OBD_FREE(reqcopy, sizeof *reqcopy);
                 RETURN(-ENOMEM);
@@ -1114,7 +1084,7 @@ out_put:
         class_export_put(reqcopy->rq_export);
 out:
         sptlrpc_svc_ctx_decref(reqcopy);
-        OBD_FREE(reqmsg, req->rq_reqlen);
+        OBD_FREE_LARGE(reqmsg, req->rq_reqlen);
         OBD_FREE(reqcopy, sizeof *reqcopy);
         RETURN(rc);
 }
@@ -1525,9 +1495,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         }
 
         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
-            lustre_msg_get_opc(req->rq_reqmsg) == obd_fail_val) {
+            lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
                 CERROR("drop incoming rpc opc %u, x"LPU64"\n",
-                       obd_fail_val, req->rq_xid);
+                       cfs_fail_val, req->rq_xid);
                 goto err_req;
         }
 
@@ -1735,7 +1705,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                lustre_msg_get_opc(request->rq_reqmsg));
 
         if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
-                OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
+                CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
 
         rc = svc->srv_handler(request);
 
@@ -2169,7 +2139,7 @@ static int ptlrpc_main(void *arg)
         env.le_ctx.lc_cookie = 0x6;
 
         /* Alloc reply state structure for this one */
-        OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, CFS_ALLOC_STD);
+        OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size);
         if (!rs) {
                 rc = -ENOMEM;
                 goto out_srv_fini;
@@ -2355,8 +2325,7 @@ static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu)
         args.cpu_index = cpu;
         args.hrs = hr;
 
-        rc = cfs_kernel_thread(ptlrpc_hr_main, (void*)&args,
-                               CLONE_VM|CLONE_FILES);
+        rc = cfs_create_thread(ptlrpc_hr_main, (void*)&args, CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 cfs_complete(&t->hrt_completion);
                 GOTO(out, rc);
@@ -2548,7 +2517,7 @@ int ptlrpc_start_thread(struct ptlrpc_service *svc)
         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
          * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
          */
-        rc = cfs_kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
+        rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 CERROR("cannot start thread '%s': rc %d\n", name, rc);
 
@@ -2726,7 +2695,6 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
 
                 req = ptlrpc_server_request_get(service, 1);
                 cfs_list_del(&req->rq_list);
-                service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
                 ptlrpc_hpreq_fini(req);
                 ptlrpc_server_finish_request(service, req);
@@ -2752,7 +2720,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         cfs_list_for_each_entry_safe(rs, t, &service->srv_free_rs_list,
                                      rs_list) {
                 cfs_list_del(&rs->rs_list);
-                OBD_FREE(rs, service->srv_max_reply_size);
+                OBD_FREE_LARGE(rs, service->srv_max_reply_size);
         }
 
         /* In case somebody rearmed this in the meantime */