Whamcloud - gitweb
- fixes the maximum OST packet size, a lot of list management, and a
authorpschwan <pschwan>
Wed, 25 Jun 2003 22:38:32 +0000 (22:38 +0000)
committerpschwan <pschwan>
Wed, 25 Jun 2003 22:38:32 +0000 (22:38 +0000)
  couple other bugs
- starts threads at OST and MDS startup
- able to complete a round of create/setattr/cancel if SENDNOW is set
- some corruption happens when I ask it to build up and cancel whole
  pages; I need to run on multiple systems to start narrowing down where.

lustre/include/linux/lustre_commit_confd.h
lustre/obdclass/recov_log.c
lustre/ptlrpc/recov_thread.c

index 3bb61ed..0ec850e 100644 (file)
@@ -28,7 +28,7 @@
 
 struct llog_commit_data {
         struct list_head           llcd_list;  /* free or pending struct list */
-        struct lustre_handle      *llcd_conn;  /* which osc is cancel target */
+        struct lustre_handle       llcd_conn;  /* which osc is cancel target */
         struct llog_commit_master *llcd_lcm;
         int                        llcd_tries; /* number of tries to send */
         int                        llcd_cookiebytes;
@@ -65,6 +65,7 @@ struct llog_commit_daemon {
         struct llog_commit_master *lcd_lcm;       /* pointer back to parent */
 };
 
+/* ptlrpc/recov_thread.c */
 int llog_start_commit_thread(void);
 struct llog_commit_data *llcd_grab(void);
 void llcd_send(struct llog_commit_data *llcd);
index 2b19986..80a9cb7 100644 (file)
@@ -223,6 +223,7 @@ static struct llog_handle *llog_current_log(struct llog_handle *cathandle,
                 loghandle = llog_new_log(cathandle, cathandle->lgh_tgtuuid);
                 GOTO(out, loghandle);
         }
+        GOTO(out, loghandle);
 out:
         return loghandle;
 }
@@ -418,8 +419,7 @@ out:
 int llog_cancel_records(struct llog_handle *cathandle, int count,
                         struct llog_cookie *cookies)
 {
-        int rc = 0;
-        int i;
+        int i, rc = 0;
         ENTRY;
 
         down(&cathandle->lgh_lock);
@@ -439,7 +439,7 @@ int llog_cancel_records(struct llog_handle *cathandle, int count,
                 llh = loghandle->lgh_hdr;
                 CDEBUG(D_HA, "cancelling "LPX64" index %u: %u\n",
                        lgl->lgl_oid, cookies->lgc_index,
-                        ext2_test_bit(cookies->lgc_index, llh->llh_bitmap));
+                       ext2_test_bit(cookies->lgc_index, llh->llh_bitmap));
                 if (!ext2_clear_bit(cookies->lgc_index, llh->llh_bitmap)) {
                         CERROR("log index %u in "LPX64":%x already clear?\n",
                                cookies->lgc_index, lgl->lgl_oid, lgl->lgl_ogen);
index 99321a9..5b92329 100644 (file)
@@ -41,13 +41,13 @@ static struct llog_commit_master lustre_lcm;
 static struct llog_commit_master *lcm = &lustre_lcm;
 
 /* Allocate new commit structs in case we do not have enough */
-static struct llog_commit_data *llcd_alloc(void)
+static int llcd_alloc(void)
 {
         struct llog_commit_data *llcd;
 
         OBD_ALLOC(llcd, PAGE_SIZE);
         if (llcd == NULL)
-                return NULL;
+                return -ENOMEM;
 
         llcd->llcd_lcm = lcm;
 
@@ -56,7 +56,7 @@ static struct llog_commit_data *llcd_alloc(void)
         atomic_inc(&lcm->lcm_llcd_numfree);
         spin_unlock(&lcm->lcm_llcd_lock);
 
-        return llcd;
+        return 0;
 }
 
 /* Get a free cookie struct from the list */
@@ -67,12 +67,14 @@ struct llog_commit_data *llcd_grab(void)
         spin_lock(&lcm->lcm_llcd_lock);
         if (list_empty(&lcm->lcm_llcd_free)) {
                 spin_unlock(&lcm->lcm_llcd_lock);
-                CERROR("no free log commit data structs!\n");
-                llcd = llcd_alloc();
-                return llcd;
+                if (llcd_alloc() < 0) {
+                        CERROR("unable to allocate log commit data!\n");
+                        return NULL;
+                }
+                spin_lock(&lcm->lcm_llcd_lock);
         }
 
-        llcd = list_entry(&lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
+        llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
         list_del(&llcd->llcd_list);
         atomic_dec(&lcm->lcm_llcd_numfree);
         spin_unlock(&lcm->lcm_llcd_lock);
@@ -139,7 +141,7 @@ static int log_commit_thread(void *arg)
         CDEBUG(D_HA, "%s started\n", current->comm);
         do {
                 struct ptlrpc_request *request;
-                struct lustre_handle *conn;
+                struct lustre_handle conn;
                 struct list_head *sending_list;
 
                 /* If we do not have enough pages available, allocate some */
@@ -179,8 +181,7 @@ static int log_commit_thread(void *arg)
                 }
 
                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
-                    atomic_read(&lcm->lcm_thread_numidle) <
-                    lcm->lcm_thread_max) {
+                    lcm->lcm_thread_total < lcm->lcm_thread_max) {
                         rc = llog_start_commit_thread();
                         if (rc < 0)
                                 CERROR("error starting thread: rc %d\n", rc);
@@ -188,77 +189,83 @@ static int log_commit_thread(void *arg)
 
                 /* Move all of the pending cancels from the same OST off of
                  * the list, so we don't get multiple threads blocked and/or
-                 * doing upcalls on the same OST in case failure.
+                 * doing upcalls on the same OST in case of failure.
                  */
                 spin_lock(&lcm->lcm_llcd_lock);
                 if (!list_empty(sending_list)) {
-                        list_move(sending_list, &lcd->lcd_llcd_list);
-                        llcd = list_entry(&lcd->lcd_llcd_list.next,
+                        list_move_tail(sending_list->next,
+                                       &lcd->lcd_llcd_list);
+                        llcd = list_entry(lcd->lcd_llcd_list.next,
                                           typeof(*llcd), llcd_list);
+                        LASSERT(llcd->llcd_lcm == lcm);
                         conn = llcd->llcd_conn;
                 }
                 list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
-                        if (conn == llcd->llcd_conn)
-                                list_move_tail(&llcd->llcd_list, sending_list);
+                        LASSERT(llcd->llcd_lcm == lcm);
+                        if (memcmp(&conn, &llcd->llcd_conn, sizeof(conn)) == 0)
+                                list_move_tail(&llcd->llcd_list,
+                                               &lcd->lcd_llcd_list);
                 }
                 if (sending_list != &lcm->lcm_llcd_resend) {
                         list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
                                                  llcd_list) {
-                                if (conn == llcd->llcd_conn)
+                                LASSERT(llcd->llcd_lcm == lcm);
+                                if (memcmp(&conn, &llcd->llcd_conn,
+                                           sizeof(conn)) == 0)
                                         list_move_tail(&llcd->llcd_list,
-                                                       &lcm->lcm_llcd_resend);
+                                                       &lcd->lcd_llcd_list);
                         }
                 }
                 spin_unlock(&lcm->lcm_llcd_lock);
 
                 /* We are the only one manipulating our local list - no lock */
                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
+                        char *bufs[1] = {(char *)llcd->llcd_cookies};
                         list_del(&llcd->llcd_list);
 
-                        request = ptlrpc_prep_req(class_conn2cliimp(conn),
+                        request = ptlrpc_prep_req(class_conn2cliimp(&conn),
                                                   OST_LOG_CANCEL, 1,
                                                   &llcd->llcd_cookiebytes,
-                                                  (char **)&llcd->llcd_cookies);
+                                                  bufs);
                         if (request == NULL) {
                                 rc = -ENOMEM;
                                 CERROR("error preparing commit: rc %d\n", rc);
 
                                 spin_lock(&lcm->lcm_llcd_lock);
-                                list_splice(&lcd->lcd_llcd_list,
-                                            &lcm->lcm_llcd_resend);
+                                list_splice_init(&lcd->lcd_llcd_list,
+                                                 &lcm->lcm_llcd_resend);
                                 spin_unlock(&lcm->lcm_llcd_lock);
                                 break;
                         }
 
                         request->rq_replen = lustre_msg_size(0, NULL);
                         rc = ptlrpc_queue_wait(request);
+                        ptlrpc_req_finished(request);
 
                         /* If the RPC failed, we put this and the remaining
-                         * messages onto the resend list for another time.
-                         */
-                        if (rc) {
-                                spin_lock(&lcm->lcm_llcd_lock);
-                                list_splice(&lcd->lcd_llcd_list,
-                                            &lcm->lcm_llcd_resend);
-                                if (++llcd->llcd_tries < 5) {
-                                        CERROR("commit %p failed %dx: rc %d\n",
-                                               llcd, llcd->llcd_tries, rc);
-
-                                        list_add_tail(&llcd->llcd_list,
-                                                      &lcm->lcm_llcd_resend);
-                                        spin_unlock(&lcm->lcm_llcd_lock);
-                                } else {
-                                        spin_unlock(&lcm->lcm_llcd_lock);
-                                        CERROR("commit %p dropped %d cookies: "
-                                               "rc %d\n", llcd,
-                                               llcd->llcd_cookiebytes /
-                                               sizeof(*llcd->llcd_cookies), rc);
-                                       llcd_put(llcd);
-                                }
-                                break;
-                        } else
+                         * messages onto the resend list for another time. */
+                        if (rc == 0) {
                                 llcd_put(llcd);
-                        ptlrpc_req_finished(request);
+                                continue;
+                        }
+
+                        spin_lock(&lcm->lcm_llcd_lock);
+                        list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
+                        if (++llcd->llcd_tries < 5) {
+                                CERROR("commit %p failed %dx: rc %d\n",
+                                       llcd, llcd->llcd_tries, rc);
+
+                                list_add_tail(&llcd->llcd_list,
+                                              &lcm->lcm_llcd_resend);
+                                spin_unlock(&lcm->lcm_llcd_lock);
+                        } else {
+                                spin_unlock(&lcm->lcm_llcd_lock);
+                                CERROR("commit %p dropped %d cookies: rc %d\n",
+                                       llcd, llcd->llcd_cookiebytes /
+                                       sizeof(*llcd->llcd_cookies), rc);
+                                llcd_put(llcd);
+                        }
+                        break;
                 }
 
                 if (rc == 0) {
@@ -313,6 +320,7 @@ int llog_init_commit_master(void)
         INIT_LIST_HEAD(&lcm->lcm_llcd_free);
         spin_lock_init(&lcm->lcm_llcd_lock);
         atomic_set(&lcm->lcm_llcd_numfree, 0);
+        lcm->lcm_llcd_minfree = 0;
         return 0;
 }