Whamcloud - gitweb
LU-663 kernel: Some arch do not have NUMA features anymore
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index cfb798f..b4b036f 100644 (file)
@@ -28,6 +28,9 @@
 /*
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -68,28 +71,6 @@ static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);
 static CFS_LIST_HEAD(ptlrpc_all_services);
 cfs_spinlock_t ptlrpc_all_services_lock;
 
-static char *
-ptlrpc_alloc_request_buffer (int size)
-{
-        char *ptr;
-
-        if (size > SVC_BUF_VMALLOC_THRESHOLD)
-                OBD_VMALLOC(ptr, size);
-        else
-                OBD_ALLOC(ptr, size);
-
-        return (ptr);
-}
-
-static void
-ptlrpc_free_request_buffer (char *ptr, int size)
-{
-        if (size > SVC_BUF_VMALLOC_THRESHOLD)
-                OBD_VFREE(ptr, size);
-        else
-                OBD_FREE(ptr, size);
-}
-
 struct ptlrpc_request_buffer_desc *
 ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
 {
@@ -104,7 +85,7 @@ ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
         rqbd->rqbd_cbid.cbid_fn = request_in_callback;
         rqbd->rqbd_cbid.cbid_arg = rqbd;
         CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
-        rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);
+        OBD_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
 
         if (rqbd->rqbd_buffer == NULL) {
                 OBD_FREE_PTR(rqbd);
@@ -132,7 +113,7 @@ ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
         svc->srv_nbufs--;
         cfs_spin_unlock(&svc->srv_lock);
 
-        ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);
+        OBD_FREE_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
         OBD_FREE_PTR(rqbd);
 }
 
@@ -643,32 +624,6 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req)
 }
 
 /**
- * increment the number of active requests consuming service threads.
- */
-void ptlrpc_server_active_request_inc(struct ptlrpc_request *req)
-{
-        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
-        struct ptlrpc_service *svc = rqbd->rqbd_service;
-
-        cfs_spin_lock(&svc->srv_rq_lock);
-        svc->srv_n_active_reqs++;
-        cfs_spin_unlock(&svc->srv_rq_lock);
-}
-
-/**
- * decrement the number of active requests consuming service threads.
- */
-void ptlrpc_server_active_request_dec(struct ptlrpc_request *req)
-{
-        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
-        struct ptlrpc_service *svc = rqbd->rqbd_service;
-
-        cfs_spin_lock(&svc->srv_rq_lock);
-        svc->srv_n_active_reqs--;
-        cfs_spin_unlock(&svc->srv_rq_lock);
-}
-
-/**
  * drop a reference count of the request. if it reaches 0, we either
  * put it into history list, or free it immediately.
  */
@@ -879,6 +834,8 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
  */
 static int ptlrpc_check_req(struct ptlrpc_request *req)
 {
+        int rc = 0;
+
         if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
                      req->rq_export->exp_conn_cnt)) {
                 DEBUG_REQ(D_ERROR, req,
@@ -893,12 +850,28 @@ static int ptlrpc_check_req(struct ptlrpc_request *req)
                 error response instead. */
                 CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n",
                        req, req->rq_export->exp_obd->obd_name);
-                req->rq_status = -ENODEV;
+                rc = -ENODEV;
+        } else if (lustre_msg_get_flags(req->rq_reqmsg) &
+                   (MSG_REPLAY | MSG_REQ_REPLAY_DONE) &&
+                   !(req->rq_export->exp_obd->obd_recovering)) {
+                        DEBUG_REQ(D_ERROR, req,
+                                  "Invalid replay without recovery");
+                        class_fail_export(req->rq_export);
+                        rc = -ENODEV;
+        } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0 &&
+                   !(req->rq_export->exp_obd->obd_recovering)) {
+                        DEBUG_REQ(D_ERROR, req, "Invalid req with transno "
+                                  LPU64" without recovery",
+                                  lustre_msg_get_transno(req->rq_reqmsg));
+                        class_fail_export(req->rq_export);
+                        rc = -ENODEV;
+        }
+
+        if (unlikely(rc < 0)) {
+                req->rq_status = rc;
                 ptlrpc_error(req);
-                return -ENODEV;
         }
-
-        return 0;
+        return rc;
 }
 
 static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
@@ -1054,7 +1027,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         OBD_ALLOC(reqcopy, sizeof *reqcopy);
         if (reqcopy == NULL)
                 RETURN(-ENOMEM);
-        OBD_ALLOC(reqmsg, req->rq_reqlen);
+        OBD_ALLOC_LARGE(reqmsg, req->rq_reqlen);
         if (!reqmsg) {
                 OBD_FREE(reqcopy, sizeof *reqcopy);
                 RETURN(-ENOMEM);
@@ -1114,7 +1087,7 @@ out_put:
         class_export_put(reqcopy->rq_export);
 out:
         sptlrpc_svc_ctx_decref(reqcopy);
-        OBD_FREE(reqmsg, req->rq_reqlen);
+        OBD_FREE_LARGE(reqmsg, req->rq_reqlen);
         OBD_FREE(reqcopy, sizeof *reqcopy);
         RETURN(rc);
 }
@@ -1390,6 +1363,10 @@ static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force)
  */
 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
 {
+#ifndef __KERNEL__
+        if (1) /* always allow to handle normal request for liblustre */
+                return 1;
+#endif
         if (force ||
             svc->srv_n_active_reqs < svc->srv_threads_running - 2)
                 return 1;
@@ -1521,9 +1498,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         }
 
         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
-            lustre_msg_get_opc(req->rq_reqmsg) == obd_fail_val) {
+            lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
                 CERROR("drop incoming rpc opc %u, x"LPU64"\n",
-                       obd_fail_val, req->rq_xid);
+                       cfs_fail_val, req->rq_xid);
                 goto err_req;
         }
 
@@ -1542,6 +1519,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
                 break;
         case MDS_READPAGE:
         case OST_READ:
+        case MGS_CONFIG_READ:
                 req->rq_bulk_read = 1;
                 break;
         }
@@ -1731,7 +1709,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                lustre_msg_get_opc(request->rq_reqmsg));
 
         if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
-                OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
+                CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
 
         rc = svc->srv_handler(request);
 
@@ -2070,6 +2048,33 @@ ptlrpc_server_request_waiting(struct ptlrpc_service *svc)
         return !cfs_list_empty(&svc->srv_req_in_queue);
 }
 
+static __attribute__((__noinline__)) int
+ptlrpc_wait_event(struct ptlrpc_service *svc,
+                  struct ptlrpc_thread *thread)
+{
+        /* Don't exit while there are replies to be handled */
+        struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
+                                             ptlrpc_retry_rqbds, svc);
+
+        lc_watchdog_disable(thread->t_watchdog);
+
+        cfs_cond_resched();
+
+        l_wait_event_exclusive_head(svc->srv_waitq,
+                               ptlrpc_thread_stopping(thread) ||
+                               ptlrpc_server_request_waiting(svc) ||
+                               ptlrpc_server_request_pending(svc, 0) ||
+                               ptlrpc_rqbd_pending(svc) ||
+                               ptlrpc_at_check(svc), &lwi);
+
+        if (ptlrpc_thread_stopping(thread))
+                return -EINTR;
+
+        lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
+
+        return 0;
+}
+
 /**
  * Main thread body for service threads.
  * Waits in a loop waiting for new requests to process to appear.
@@ -2138,7 +2143,7 @@ static int ptlrpc_main(void *arg)
         env.le_ctx.lc_cookie = 0x6;
 
         /* Alloc reply state structure for this one */
-        OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, CFS_ALLOC_STD);
+        OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size);
         if (!rs) {
                 rc = -ENOMEM;
                 goto out_srv_fini;
@@ -2176,26 +2181,9 @@ static int ptlrpc_main(void *arg)
 
         /* XXX maintain a list of all managed devices: insert here */
         while (!ptlrpc_thread_stopping(thread)) {
-                /* Don't exit while there are replies to be handled */
-                struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
-                                                     ptlrpc_retry_rqbds, svc);
-
-                lc_watchdog_disable(thread->t_watchdog);
-
-                cfs_cond_resched();
-
-                l_wait_event_exclusive_head(svc->srv_waitq,
-                                       ptlrpc_thread_stopping(thread) ||
-                                       ptlrpc_server_request_waiting(svc) ||
-                                       ptlrpc_server_request_pending(svc, 0) ||
-                                       ptlrpc_rqbd_pending(svc) ||
-                                       ptlrpc_at_check(svc), &lwi);
-
-                if (ptlrpc_thread_stopping(thread))
+                if (ptlrpc_wait_event(svc, thread))
                         break;
 
-                lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
-
                 ptlrpc_check_rqbd_pool(svc);
 
                 if (ptlrpc_threads_need_create(svc)) {
@@ -2303,7 +2291,7 @@ static int ptlrpc_hr_main(void *arg)
                  "ptlrpc_hr_%d", hr_args->thread_index);
 
         cfs_daemonize_ctxt(threadname);
-#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
+#if defined(CONFIG_NUMA) && defined(HAVE_NODE_TO_CPUMASK)
         cfs_set_cpus_allowed(cfs_current(),
                              node_to_cpumask(cpu_to_node(hr_args->cpu_index)));
 #endif
@@ -2341,8 +2329,7 @@ static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu)
         args.cpu_index = cpu;
         args.hrs = hr;
 
-        rc = cfs_kernel_thread(ptlrpc_hr_main, (void*)&args,
-                               CLONE_VM|CLONE_FILES);
+        rc = cfs_create_thread(ptlrpc_hr_main, (void*)&args, CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 cfs_complete(&t->hrt_completion);
                 GOTO(out, rc);
@@ -2534,7 +2521,7 @@ int ptlrpc_start_thread(struct ptlrpc_service *svc)
         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
          * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
          */
-        rc = cfs_kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
+        rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 CERROR("cannot start thread '%s': rc %d\n", name, rc);
 
@@ -2712,7 +2699,6 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
 
                 req = ptlrpc_server_request_get(service, 1);
                 cfs_list_del(&req->rq_list);
-                service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
                 ptlrpc_hpreq_fini(req);
                 ptlrpc_server_finish_request(service, req);
@@ -2738,7 +2724,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         cfs_list_for_each_entry_safe(rs, t, &service->srv_free_rs_list,
                                      rs_list) {
                 cfs_list_del(&rs->rs_list);
-                OBD_FREE(rs, service->srv_max_reply_size);
+                OBD_FREE_LARGE(rs, service->srv_max_reply_size);
         }
 
         /* In case somebody rearmed this in the meantime */