static struct llog_commit_master *lcm = &lustre_lcm;
/* Allocate new commit structs in case we do not have enough */
-static struct llog_commit_data *llcd_alloc(void)
+static int llcd_alloc(void)
{
struct llog_commit_data *llcd;
OBD_ALLOC(llcd, PAGE_SIZE);
if (llcd == NULL)
- return NULL;
+ return -ENOMEM;
llcd->llcd_lcm = lcm;
atomic_inc(&lcm->lcm_llcd_numfree);
spin_unlock(&lcm->lcm_llcd_lock);
- return llcd;
+ return 0;
}
/* Get a free cookie struct from the list */
spin_lock(&lcm->lcm_llcd_lock);
if (list_empty(&lcm->lcm_llcd_free)) {
spin_unlock(&lcm->lcm_llcd_lock);
- CERROR("no free log commit data structs!\n");
- llcd = llcd_alloc();
- return llcd;
+ if (llcd_alloc() < 0) {
+ CERROR("unable to allocate log commit data!\n");
+ return NULL;
+ }
+ spin_lock(&lcm->lcm_llcd_lock);
}
- llcd = list_entry(&lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
+ llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
list_del(&llcd->llcd_list);
atomic_dec(&lcm->lcm_llcd_numfree);
spin_unlock(&lcm->lcm_llcd_lock);
CDEBUG(D_HA, "%s started\n", current->comm);
do {
struct ptlrpc_request *request;
- struct lustre_handle *conn;
+ struct lustre_handle conn;
struct list_head *sending_list;
/* If we do not have enough pages available, allocate some */
}
if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
- atomic_read(&lcm->lcm_thread_numidle) <
- lcm->lcm_thread_max) {
+ lcm->lcm_thread_total < lcm->lcm_thread_max) {
rc = llog_start_commit_thread();
if (rc < 0)
CERROR("error starting thread: rc %d\n", rc);
/* Move all of the pending cancels from the same OST off of
* the list, so we don't get multiple threads blocked and/or
- * doing upcalls on the same OST in case failure.
+ * doing upcalls on the same OST in case of failure.
*/
spin_lock(&lcm->lcm_llcd_lock);
if (!list_empty(sending_list)) {
- list_move(sending_list, &lcd->lcd_llcd_list);
- llcd = list_entry(&lcd->lcd_llcd_list.next,
+ list_move_tail(sending_list->next,
+ &lcd->lcd_llcd_list);
+ llcd = list_entry(lcd->lcd_llcd_list.next,
typeof(*llcd), llcd_list);
+ LASSERT(llcd->llcd_lcm == lcm);
conn = llcd->llcd_conn;
}
list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
- if (conn == llcd->llcd_conn)
- list_move_tail(&llcd->llcd_list, sending_list);
+ LASSERT(llcd->llcd_lcm == lcm);
+ if (memcmp(&conn, &llcd->llcd_conn, sizeof(conn)) == 0)
+ list_move_tail(&llcd->llcd_list,
+ &lcd->lcd_llcd_list);
}
if (sending_list != &lcm->lcm_llcd_resend) {
list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
llcd_list) {
- if (conn == llcd->llcd_conn)
+ LASSERT(llcd->llcd_lcm == lcm);
+ if (memcmp(&conn, &llcd->llcd_conn,
+ sizeof(conn)) == 0)
list_move_tail(&llcd->llcd_list,
- &lcm->lcm_llcd_resend);
+ &lcd->lcd_llcd_list);
}
}
spin_unlock(&lcm->lcm_llcd_lock);
/* We are the only one manipulating our local list - no lock */
list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
+ char *bufs[1] = {(char *)llcd->llcd_cookies};
list_del(&llcd->llcd_list);
- request = ptlrpc_prep_req(class_conn2cliimp(conn),
+ request = ptlrpc_prep_req(class_conn2cliimp(&conn),
OST_LOG_CANCEL, 1,
&llcd->llcd_cookiebytes,
- (char **)&llcd->llcd_cookies);
+ bufs);
if (request == NULL) {
rc = -ENOMEM;
CERROR("error preparing commit: rc %d\n", rc);
spin_lock(&lcm->lcm_llcd_lock);
- list_splice(&lcd->lcd_llcd_list,
- &lcm->lcm_llcd_resend);
+ list_splice_init(&lcd->lcd_llcd_list,
+ &lcm->lcm_llcd_resend);
spin_unlock(&lcm->lcm_llcd_lock);
break;
}
request->rq_replen = lustre_msg_size(0, NULL);
rc = ptlrpc_queue_wait(request);
+ ptlrpc_req_finished(request);
/* If the RPC failed, we put this and the remaining
- * messages onto the resend list for another time.
- */
- if (rc) {
- spin_lock(&lcm->lcm_llcd_lock);
- list_splice(&lcd->lcd_llcd_list,
- &lcm->lcm_llcd_resend);
- if (++llcd->llcd_tries < 5) {
- CERROR("commit %p failed %dx: rc %d\n",
- llcd, llcd->llcd_tries, rc);
-
- list_add_tail(&llcd->llcd_list,
- &lcm->lcm_llcd_resend);
- spin_unlock(&lcm->lcm_llcd_lock);
- } else {
- spin_unlock(&lcm->lcm_llcd_lock);
- CERROR("commit %p dropped %d cookies: "
- "rc %d\n", llcd,
- llcd->llcd_cookiebytes /
- sizeof(*llcd->llcd_cookies), rc);
- llcd_put(llcd);
- }
- break;
- } else
+ * messages onto the resend list for another time. */
+ if (rc == 0) {
llcd_put(llcd);
- ptlrpc_req_finished(request);
+ continue;
+ }
+
+ spin_lock(&lcm->lcm_llcd_lock);
+ list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
+ if (++llcd->llcd_tries < 5) {
+ CERROR("commit %p failed %dx: rc %d\n",
+ llcd, llcd->llcd_tries, rc);
+
+ list_add_tail(&llcd->llcd_list,
+ &lcm->lcm_llcd_resend);
+ spin_unlock(&lcm->lcm_llcd_lock);
+ } else {
+ spin_unlock(&lcm->lcm_llcd_lock);
+ CERROR("commit %p dropped %d cookies: rc %d\n",
+ llcd, llcd->llcd_cookiebytes /
+ sizeof(*llcd->llcd_cookies), rc);
+ llcd_put(llcd);
+ }
+ break;
}
if (rc == 0) {
INIT_LIST_HEAD(&lcm->lcm_llcd_free);
spin_lock_init(&lcm->lcm_llcd_lock);
atomic_set(&lcm->lcm_llcd_numfree, 0);
+ lcm->lcm_llcd_minfree = 0;
return 0;
}