Whamcloud - gitweb
Branch b1_8
authorbobijam <bobijam>
Wed, 8 Apr 2009 02:18:02 +0000 (02:18 +0000)
committerbobijam <bobijam>
Wed, 8 Apr 2009 02:18:02 +0000 (02:18 +0000)
b=18688
o=adilger
i=nathan.rutman
i=johann

Description: Allow tuning service thread via /proc
Details    : For each service a new
             /proc/fs/lustre/{service}/*/thread_{min,max,started} entry is
             created that can be used to set min/max thread counts, and get the
             current number of running threads.

lustre/ChangeLog
lustre/include/lustre_net.h
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/recov_thread.c

index 8cbaace..3d85b65 100644 (file)
@@ -32,8 +32,16 @@ tbd Sun Microsystems, Inc.
          more information, please refer to bugzilla 17630.
 
 Severity   : enhancement
+Bugzilla   : 18688
+Description: Allow tuning service thread via /proc
+Details    : For each service a new
+            /proc/fs/lustre/{service}/*/thread_{min,max,started} entry is
+            created that can be used to set min/max thread counts, and get the
+            current number of running threads.
+
+Severity   : enhancement
 Bugzilla   : 18798
-Description: Add state history info file, enhance import info file 
+Description: Add state history info file, enhance import info file
 Details    : Track import connection state changes in a new osc/mdc proc file;
             add overview-type data to the osc/mdc import proc file.
 
index fc2366c..c030d58 100644 (file)
  * considered full when less than ?_MAXREQSIZE is left in them.
  */
 
-#define LDLM_THREADS_AUTO_MIN                                                 \
-        min((int)(num_online_cpus() * num_online_cpus() * 2), 8)
-#define LDLM_THREADS_AUTO_MAX (LDLM_THREADS_AUTO_MIN * 16)
+#define LDLM_THREADS_AUTO_MIN (2)
+#define LDLM_THREADS_AUTO_MAX (num_online_cpus() * num_online_cpus() * 32)
 #define LDLM_BL_THREADS  LDLM_THREADS_AUTO_MIN
 #define LDLM_NBUFS      (64 * num_online_cpus())
 #define LDLM_BUFSIZE    (8 * 1024)
@@ -1123,7 +1122,7 @@ int ptlrpc_pinger_del_import(struct obd_import *imp);
 int ptlrpc_add_timeout_client(int time, enum timeout_event event,
                               timeout_cb_t cb, void *data,
                               struct list_head *obd_list);
-int ptlrpc_del_timeout_client(struct list_head *obd_list, 
+int ptlrpc_del_timeout_client(struct list_head *obd_list,
                               enum timeout_event event);
 struct ptlrpc_request * ptlrpc_prep_ping(struct obd_import *imp);
 int ptlrpc_obd_ping(struct obd_device *obd);
index 5a55dca..476d9b7 100644 (file)
@@ -288,6 +288,81 @@ ptlrpc_lprocfs_write_req_history_max(struct file *file, const char *buffer,
         return count;
 }
 
+static int
+ptlrpc_lprocfs_rd_threads_min(char *page, char **start, off_t off,
+                              int count, int *eof, void *data)
+{
+        struct ptlrpc_service *svc = data;
+
+        return snprintf(page, count, "%d\n", svc->srv_threads_min);
+}
+
+static int
+ptlrpc_lprocfs_wr_threads_min(struct file *file, const char *buffer,
+                              unsigned long count, void *data)
+{
+        struct ptlrpc_service *svc = data;
+        int                    val;
+        int                    rc = lprocfs_write_helper(buffer, count, &val);
+
+        if (rc < 0)
+                return rc;
+
+        if (val < 2)
+                return -ERANGE;
+
+        if (val > svc->srv_threads_max)
+                return -ERANGE;
+
+        spin_lock(&svc->srv_lock);
+        svc->srv_threads_min = val;
+        spin_unlock(&svc->srv_lock);
+
+        return count;
+}
+
+static int
+ptlrpc_lprocfs_rd_threads_started(char *page, char **start, off_t off,
+                                  int count, int *eof, void *data)
+{
+        struct ptlrpc_service *svc = data;
+
+        return snprintf(page, count, "%d\n", svc->srv_threads_started);
+}
+
+static int
+ptlrpc_lprocfs_rd_threads_max(char *page, char **start, off_t off,
+                              int count, int *eof, void *data)
+{
+        struct ptlrpc_service *svc = data;
+
+        return snprintf(page, count, "%d\n", svc->srv_threads_max);
+}
+
+static int
+ptlrpc_lprocfs_wr_threads_max(struct file *file, const char *buffer,
+                              unsigned long count, void *data)
+{
+        struct ptlrpc_service *svc = data;
+        int                    val;
+        int                    rc = lprocfs_write_helper(buffer, count, &val);
+
+        if (rc < 0)
+                return rc;
+
+        if (val < 2)
+                return -ERANGE;
+
+        if (val < svc->srv_threads_min)
+                return -ERANGE;
+
+        spin_lock(&svc->srv_lock);
+        svc->srv_threads_max = val;
+        spin_unlock(&svc->srv_lock);
+
+        return count;
+}
+
 struct ptlrpc_srh_iterator {
         __u64                  srhi_seq;
         struct ptlrpc_request *srhi_req;
@@ -540,21 +615,31 @@ void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry,
                                      struct ptlrpc_service *svc)
 {
         struct lprocfs_vars lproc_vars[] = {
+                {.name       = "high_priority_ratio",
+                 .read_fptr  = ptlrpc_lprocfs_rd_hp_ratio,
+                 .write_fptr = ptlrpc_lprocfs_wr_hp_ratio,
+                 .data       = svc},
                 {.name       = "req_buffer_history_len",
-                 .write_fptr = NULL,
                  .read_fptr  = ptlrpc_lprocfs_read_req_history_len,
                  .data       = svc},
                 {.name       = "req_buffer_history_max",
                  .write_fptr = ptlrpc_lprocfs_write_req_history_max,
                  .read_fptr  = ptlrpc_lprocfs_read_req_history_max,
                  .data       = svc},
+                {.name       = "threads_min",
+                 .read_fptr  = ptlrpc_lprocfs_rd_threads_min,
+                 .write_fptr = ptlrpc_lprocfs_wr_threads_min,
+                 .data       = svc},
+                {.name       = "threads_max",
+                 .read_fptr  = ptlrpc_lprocfs_rd_threads_max,
+                 .write_fptr = ptlrpc_lprocfs_wr_threads_max,
+                 .data       = svc},
+                {.name       = "threads_started",
+                 .read_fptr  = ptlrpc_lprocfs_rd_threads_started,
+                 .data       = svc},
                 {.name       = "timeouts",
                  .read_fptr  = ptlrpc_lprocfs_rd_timeouts,
                  .data       = svc},
-                {.name       = "high_priority_ratio",
-                 .read_fptr  = ptlrpc_lprocfs_rd_hp_ratio,
-                 .write_fptr = ptlrpc_lprocfs_wr_hp_ratio,
-                 .data       = svc},
                 {NULL}
         };
         static struct file_operations req_history_fops = {
index 64a0eda..96ee8e9 100644 (file)
@@ -76,8 +76,8 @@ enum {
         LLOG_LCM_FL_EXIT        = 1 << 1
 };
 
-static void llcd_print(struct llog_canceld_ctxt *llcd, 
-                       const char *func, int line) 
+static void llcd_print(struct llog_canceld_ctxt *llcd,
+                       const char *func, int line)
 {
         CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line);
         CDEBUG(D_RPCTRACE, "  size: %d\n", llcd->llcd_size);
@@ -86,7 +86,7 @@ static void llcd_print(struct llog_canceld_ctxt *llcd,
         CDEBUG(D_RPCTRACE, "  cookiebytes : %d\n", llcd->llcd_cookiebytes);
 }
 
-/** 
+/**
  * Allocate new llcd from cache, init it and return to caller.
  * Bumps number of objects allocated.
  */
@@ -97,7 +97,7 @@ static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm)
 
         LASSERT(lcm != NULL);
 
-        /* 
+        /*
          * We want to send one page of cookies with rpc header. This buffer
          * will be assigned later to the rpc, this is why we preserve the
          * space for rpc header.
@@ -145,14 +145,14 @@ static void llcd_free(struct llog_canceld_ctxt *llcd)
                 atomic_dec(&lcm->lcm_count);
                 spin_unlock(&lcm->lcm_lock);
 
-                CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n", 
+                CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n",
                        llcd, lcm, atomic_read(&lcm->lcm_count));
         }
 
         LASSERT(atomic_read(&llcd_count) > 0);
         atomic_dec(&llcd_count);
 
-        size = offsetof(struct llog_canceld_ctxt, llcd_cookies) + 
+        size = offsetof(struct llog_canceld_ctxt, llcd_cookies) +
             llcd->llcd_size;
         OBD_SLAB_FREE(llcd, llcd_cache, size);
 }
@@ -161,7 +161,7 @@ static void llcd_free(struct llog_canceld_ctxt *llcd)
  * Checks if passed cookie fits into llcd free space buffer. Returns
  * 1 if yes and 0 otherwise.
  */
-static inline int 
+static inline int
 llcd_fit(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
 {
         return (llcd->llcd_size - llcd->llcd_cookiebytes >= sizeof(*cookies));
@@ -170,11 +170,11 @@ llcd_fit(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
 /**
  * Copy passed @cookies to @llcd.
  */
-static inline void 
+static inline void
 llcd_copy(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
 {
         LASSERT(llcd_fit(llcd, cookies));
-        memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
+        memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes,
               cookies, sizeof(*cookies));
         llcd->llcd_cookiebytes += sizeof(*cookies);
 }
@@ -184,7 +184,7 @@ llcd_copy(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
  * sending result. Error is passed in @rc. Note, that this will be called
  * in cleanup time when all inflight rpcs aborted.
  */
-static int 
+static int
 llcd_interpret(struct ptlrpc_request *req, void *noused, int rc)
 {
         struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0];
@@ -192,10 +192,10 @@ llcd_interpret(struct ptlrpc_request *req, void *noused, int rc)
         llcd_free(llcd);
         return 0;
 }
+
 /**
  * Send @llcd to remote node. Free llcd uppon completion or error. Sending
- * is performed in async style so this function will return asap without 
+ * is performed in async style so this function will return asap without
  * blocking.
  */
 static int llcd_send(struct llog_canceld_ctxt *llcd)
@@ -212,7 +212,7 @@ static int llcd_send(struct llog_canceld_ctxt *llcd)
 
         ctxt = llcd->llcd_ctxt;
         if (!ctxt) {
-                CERROR("Invalid llcd with NULL ctxt found (%p)\n", 
+                CERROR("Invalid llcd with NULL ctxt found (%p)\n",
                        llcd);
                 llcd_print(llcd, __FUNCTION__, __LINE__);
                 LBUG();
@@ -224,9 +224,9 @@ static int llcd_send(struct llog_canceld_ctxt *llcd)
 
         lcm = llcd->llcd_lcm;
 
-        /* 
+        /*
          * Check if we're in exit stage. Do not send llcd in
-         * this case. 
+         * this case.
          */
         if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
                 GOTO(exit, rc = -ENODEV);
@@ -234,9 +234,9 @@ static int llcd_send(struct llog_canceld_ctxt *llcd)
         CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd);
 
         import = llcd->llcd_ctxt->loc_imp;
-        if (!import || (import == LP_POISON) || 
+        if (!import || (import == LP_POISON) ||
             (import->imp_client == LP_POISON)) {
-                CERROR("Invalid import %p for llcd %p\n", 
+                CERROR("Invalid import %p for llcd %p\n",
                        import, llcd);
                 GOTO(exit, rc = -ENODEV);
         }
@@ -244,20 +244,20 @@ static int llcd_send(struct llog_canceld_ctxt *llcd)
         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
 
         /*
-         * No need to get import here as it is already done in 
+         * No need to get import here as it is already done in
          * llog_receptor_accept().
          */
         req = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION,
                               OBD_LOG_CANCEL, 2, size, bufs);
         if (req == NULL) {
-                CERROR("Can't allocate request for sending llcd %p\n", 
+                CERROR("Can't allocate request for sending llcd %p\n",
                        llcd);
                 GOTO(exit, rc = -ENOMEM);
         }
 
-        /* 
+        /*
          * Check if we're in exit stage again. Do not send llcd in
-         * this case. 
+         * this case.
          */
         if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) {
                 ptlrpc_req_finished(req);
@@ -322,7 +322,7 @@ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt)
         if (!llcd)
                 return NULL;
 
-        CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n", 
+        CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n",
                llcd, ctxt);
 
         ctxt->loc_llcd = NULL;
@@ -369,7 +369,7 @@ static int llcd_push(struct llog_ctxt *ctxt)
         int rc;
 
         /*
-         * Make sure that this llcd will not be sent again as we detach 
+         * Make sure that this llcd will not be sent again as we detach
          * it from ctxt.
          */
         llcd = llcd_detach(ctxt);
@@ -378,7 +378,7 @@ static int llcd_push(struct llog_ctxt *ctxt)
                 llcd_print(llcd, __FUNCTION__, __LINE__);
                 LBUG();
         }
-        
+
         rc = llcd_send(llcd);
         if (rc)
                 CERROR("Couldn't send llcd %p (%d)\n", llcd, rc);
@@ -397,7 +397,7 @@ int llog_recov_thread_start(struct llog_commit_master *lcm)
 
         rc = ptlrpcd_start(lcm->lcm_name, &lcm->lcm_pc);
         if (rc) {
-                CERROR("Error %d while starting recovery thread %s\n", 
+                CERROR("Error %d while starting recovery thread %s\n",
                        rc, lcm->lcm_name);
                 RETURN(rc);
         }
@@ -413,7 +413,7 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
         ENTRY;
 
         /*
-         * Let all know that we're stopping. This will also make 
+         * Let all know that we're stopping. This will also make
          * llcd_send() refuse any new llcds.
          */
         set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags);
@@ -423,7 +423,7 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
          * for processing now.
          */
         ptlrpcd_stop(&lcm->lcm_pc, force);
-        
+
         /*
          * By this point no alive inflight llcds should be left. Only
          * those forgotten in sync may still be attached to ctxt. Let's
@@ -433,7 +433,7 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
                 struct llog_canceld_ctxt *llcd;
                 struct list_head         *tmp;
 
-                CERROR("Busy llcds found (%d) on lcm %p\n", 
+                CERROR("Busy llcds found (%d) on lcm %p\n",
                        atomic_read(&lcm->lcm_count) == 0, lcm);
 
                 spin_lock(&lcm->lcm_lock);
@@ -443,7 +443,7 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
                         llcd_print(llcd, __FUNCTION__, __LINE__);
                 }
                 spin_unlock(&lcm->lcm_lock);
-                
+
                 /*
                  * No point to go further with busy llcds at this point
                  * as this is clear bug. It might mean we got hanging
@@ -476,8 +476,8 @@ struct llog_commit_master *llog_recov_thread_init(char *name)
         /*
          * Try to create threads with unique names.
          */
-        snprintf(lcm->lcm_name, sizeof(lcm->lcm_name), 
-                 "ll_log_commit_%s", name);
+        snprintf(lcm->lcm_name, sizeof(lcm->lcm_name),
+                 "lcm_%s", name);
 
         atomic_set(&lcm->lcm_count, 0);
         spin_lock_init(&lcm->lcm_lock);
@@ -506,7 +506,7 @@ void llog_recov_thread_fini(struct llog_commit_master *lcm, int force)
 }
 EXPORT_SYMBOL(llog_recov_thread_fini);
 
-static int llog_recov_thread_replay(struct llog_ctxt *ctxt, 
+static int llog_recov_thread_replay(struct llog_ctxt *ctxt,
                                     void *cb, void *arg)
 {
         struct obd_device *obd = ctxt->loc_obd;
@@ -535,7 +535,7 @@ static int llog_recov_thread_replay(struct llog_ctxt *ctxt,
                 OBD_FREE_PTR(lpca);
                 RETURN(-ENODEV);
         }
-        rc = cfs_kernel_thread(llog_cat_process_thread, lpca, 
+        rc = cfs_kernel_thread(llog_cat_process_thread, lpca,
                                CLONE_VM | CLONE_FILES);
         if (rc < 0) {
                 CERROR("Error starting llog_cat_process_thread(): %d\n", rc);
@@ -556,14 +556,14 @@ int llog_obd_repl_connect(struct llog_ctxt *ctxt,
         int rc;
         ENTRY;
 
-        /* 
+        /*
          * Send back cached llcd from llog before recovery if we have any.
          * This is void is nothing cached is found there.
          */
         llog_sync(ctxt, NULL);
 
-        /* 
-         * Start recovery in separate thread. 
+        /*
+         * Start recovery in separate thread.
          */
         mutex_down(&ctxt->loc_sem);
         ctxt->loc_gen = *gen;
@@ -574,7 +574,7 @@ int llog_obd_repl_connect(struct llog_ctxt *ctxt,
 }
 EXPORT_SYMBOL(llog_obd_repl_connect);
 
-/** 
+/**
  * Deleted objects have a commit callback that cancels the MDS
  * log record for the deletion. The commit callback calls this
  * function.
@@ -604,7 +604,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
         }
 
         if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) {
-                CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n", 
+                CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n",
                        ctxt);
                 GOTO(out, rc = -ENODEV);
         }
@@ -613,7 +613,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
 
         if (count > 0 && cookies != NULL) {
                 /*
-                 * Get new llcd from ctxt if required. 
+                 * Get new llcd from ctxt if required.
                  */
                 if (!llcd) {
                         llcd = llcd_get(ctxt);
@@ -628,8 +628,8 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                 }
 
                 /*
-                 * Llcd does not have enough room for @cookies. Let's push 
-                 * it out and allocate new one. 
+                 * Llcd does not have enough room for @cookies. Let's push
+                 * it out and allocate new one.
                  */
                 if (!llcd_fit(llcd, cookies)) {
                         rc = llcd_push(ctxt);
@@ -677,8 +677,8 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
         int rc = 0;
         ENTRY;
 
-        /* 
-         * Flush any remaining llcd. 
+        /*
+         * Flush any remaining llcd.
          */
         mutex_down(&ctxt->loc_sem);
         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
@@ -690,10 +690,10 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
                 llcd_put(ctxt);
                 mutex_up(&ctxt->loc_sem);
         } else {
-                /* 
+                /*
                  * This is either llog_sync() from generic llog code or sync
                  * on client disconnect. In either way let's do it and send
-                 * llcds to the target with waiting for completion. 
+                 * llcds to the target with waiting for completion.
                  */
                 CDEBUG(D_RPCTRACE, "Sync cached llcd\n");
                 mutex_up(&ctxt->loc_sem);
@@ -720,7 +720,7 @@ int llog_recov_init(void)
 {
         int llcd_size;
 
-        llcd_size = CFS_PAGE_SIZE - 
+        llcd_size = CFS_PAGE_SIZE -
                 lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
         llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies);
         llcd_cache = cfs_mem_cache_create("llcd_cache", llcd_size, 0, 0);
@@ -737,7 +737,7 @@ int llog_recov_init(void)
 void llog_recov_fini(void)
 {
         /*
-         * Kill llcd cache when thread is stopped and we're sure no 
+         * Kill llcd cache when thread is stopped and we're sure no
          * llcd in use left.
          */
         if (llcd_cache) {
@@ -745,7 +745,7 @@ void llog_recov_fini(void)
                  * In 2.6.22 cfs_mem_cache_destroy() will not return error
                  * for busy resources. Let's check it another way.
                  */
-                LASSERTF(atomic_read(&llcd_count) == 0, 
+                LASSERTF(atomic_read(&llcd_count) == 0,
                          "Can't destroy llcd cache! Number of "
                          "busy llcds: %d\n", atomic_read(&llcd_count));
                 cfs_mem_cache_destroy(llcd_cache);