1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2003 Cluster File Systems, Inc.
5 * Author: Andreas Dilger <adilger@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 * OST<->MDS recovery logging thread.
24 * Invariants in implementation:
25 * - we do not share logs among different OST<->MDS connections, so that
26 * if an OST or MDS fails it need only look at log(s) relevant to itself
29 #define DEBUG_SUBSYSTEM S_LOG
32 # define EXPORT_SYMTAB
36 # include <portals/list.h>
37 # include <liblustre.h>
39 #include <linux/kp30.h>
41 #include <linux/obd_class.h>
42 #include <linux/lustre_commit_confd.h>
43 #include <linux/obd_support.h>
44 #include <linux/obd_class.h>
45 #include <linux/lustre_net.h>
46 #include <portals/types.h>
47 #include <portals/list.h>
48 #include "ptlrpc_internal.h"
50 static struct llog_commit_master lustre_lcm;
51 static struct llog_commit_master *lcm = &lustre_lcm;
53 /* Allocate new commit structs in case we do not have enough */
54 static int llcd_alloc(void)
56 struct llog_commit_data *llcd;
58 OBD_ALLOC(llcd, PAGE_SIZE);
64 spin_lock(&lcm->lcm_llcd_lock);
65 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
66 atomic_inc(&lcm->lcm_llcd_numfree);
67 spin_unlock(&lcm->lcm_llcd_lock);
72 /* Get a free cookie struct from the list */
73 struct llog_commit_data *llcd_grab(void)
75 struct llog_commit_data *llcd;
77 spin_lock(&lcm->lcm_llcd_lock);
78 if (list_empty(&lcm->lcm_llcd_free)) {
79 spin_unlock(&lcm->lcm_llcd_lock);
80 if (llcd_alloc() < 0) {
81 CERROR("unable to allocate log commit data!\n");
84 spin_lock(&lcm->lcm_llcd_lock);
87 llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
88 list_del(&llcd->llcd_list);
89 atomic_dec(&lcm->lcm_llcd_numfree);
90 spin_unlock(&lcm->lcm_llcd_lock);
93 llcd->llcd_cookiebytes = 0;
97 EXPORT_SYMBOL(llcd_grab);
99 static void llcd_put(struct llog_commit_data *llcd)
101 if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
102 OBD_FREE(llcd, PAGE_SIZE);
104 spin_lock(&lcm->lcm_llcd_lock);
105 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
106 atomic_inc(&lcm->lcm_llcd_numfree);
107 spin_unlock(&lcm->lcm_llcd_lock);
111 /* Send some cookies to the appropriate target */
112 void llcd_send(struct llog_commit_data *llcd)
114 spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
115 list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
116 spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
118 wake_up_nr(&llcd->llcd_lcm->lcm_waitq, 1);
120 EXPORT_SYMBOL(llcd_send);
122 static int log_commit_thread(void *arg)
124 struct llog_commit_master *lcm = arg;
125 struct llog_commit_daemon *lcd;
126 struct llog_commit_data *llcd, *n;
130 OBD_ALLOC(lcd, sizeof(*lcd));
135 ptlrpc_daemonize(); /* thread never needs to do IO */
137 SIGNAL_MASK_LOCK(current, flags);
138 sigfillset(¤t->blocked);
140 SIGNAL_MASK_UNLOCK(current, flags);
142 spin_lock(&lcm->lcm_thread_lock);
143 THREAD_NAME(current->comm, "ll_log_commit_%d",
144 atomic_read(&lcm->lcm_thread_total));
145 atomic_inc(&lcm->lcm_thread_total);
146 spin_unlock(&lcm->lcm_thread_lock);
149 INIT_LIST_HEAD(&lcd->lcd_lcm_list);
150 INIT_LIST_HEAD(&lcd->lcd_llcd_list);
153 CDEBUG(D_HA, "%s started\n", current->comm);
155 struct ptlrpc_request *request;
156 struct obd_import *import = NULL;
157 struct list_head *sending_list;
160 /* If we do not have enough pages available, allocate some */
161 while (atomic_read(&lcm->lcm_llcd_numfree) <
162 lcm->lcm_llcd_minfree) {
163 if (llcd_alloc() < 0)
167 spin_lock(&lcm->lcm_thread_lock);
168 atomic_inc(&lcm->lcm_thread_numidle);
169 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
170 spin_unlock(&lcm->lcm_thread_lock);
172 wait_event_interruptible(lcm->lcm_waitq,
173 !list_empty(&lcm->lcm_llcd_pending) ||
174 lcm->lcm_flags & LLOG_LCM_FL_EXIT);
176 /* If we are the last available thread, start a new one in case
177 * we get blocked on an RPC (nobody else will start a new one)*/
178 spin_lock(&lcm->lcm_thread_lock);
179 atomic_dec(&lcm->lcm_thread_numidle);
180 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
181 spin_unlock(&lcm->lcm_thread_lock);
183 sending_list = &lcm->lcm_llcd_pending;
185 if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
186 lcm->lcm_llcd_maxfree = 0;
187 lcm->lcm_llcd_minfree = 0;
188 lcm->lcm_thread_max = 0;
190 if (list_empty(&lcm->lcm_llcd_pending) ||
191 lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
195 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
196 atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
197 rc = llog_start_commit_thread();
199 CERROR("error starting thread: rc %d\n", rc);
202 /* Move all of the pending cancels from the same OST off of
203 * the list, so we don't get multiple threads blocked and/or
204 * doing upcalls on the same OST in case of failure. */
205 spin_lock(&lcm->lcm_llcd_lock);
206 if (!list_empty(sending_list)) {
207 list_move_tail(sending_list->next,
208 &lcd->lcd_llcd_list);
209 llcd = list_entry(lcd->lcd_llcd_list.next,
210 typeof(*llcd), llcd_list);
211 LASSERT(llcd->llcd_lcm == lcm);
212 import = llcd->llcd_import;
214 list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
215 LASSERT(llcd->llcd_lcm == lcm);
216 if (import == llcd->llcd_import)
217 list_move_tail(&llcd->llcd_list,
218 &lcd->lcd_llcd_list);
220 if (sending_list != &lcm->lcm_llcd_resend) {
221 list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
223 LASSERT(llcd->llcd_lcm == lcm);
224 if (import == llcd->llcd_import)
225 list_move_tail(&llcd->llcd_list,
226 &lcd->lcd_llcd_list);
229 spin_unlock(&lcm->lcm_llcd_lock);
231 /* We are the only one manipulating our local list - no lock */
232 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
233 char *bufs[1] = {(char *)llcd->llcd_cookies};
234 list_del(&llcd->llcd_list);
236 request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
237 &llcd->llcd_cookiebytes,
239 if (request == NULL) {
241 CERROR("error preparing commit: rc %d\n", rc);
243 spin_lock(&lcm->lcm_llcd_lock);
244 list_splice(&lcd->lcd_llcd_list,
245 &lcm->lcm_llcd_resend);
246 INIT_LIST_HEAD(&lcd->lcd_llcd_list);
247 spin_unlock(&lcm->lcm_llcd_lock);
251 request->rq_replen = lustre_msg_size(0, NULL);
252 rc = ptlrpc_queue_wait(request);
253 ptlrpc_req_finished(request);
255 /* If the RPC failed, we put this and the remaining
256 * messages onto the resend list for another time. */
262 spin_lock(&lcm->lcm_llcd_lock);
263 list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
264 if (++llcd->llcd_tries < 5) {
265 CERROR("commit %p failed %dx: rc %d\n",
266 llcd, llcd->llcd_tries, rc);
268 list_add_tail(&llcd->llcd_list,
269 &lcm->lcm_llcd_resend);
270 spin_unlock(&lcm->lcm_llcd_lock);
272 spin_unlock(&lcm->lcm_llcd_lock);
273 CERROR("commit %p dropped %d cookies: rc %d\n",
274 llcd, (int)(llcd->llcd_cookiebytes /
275 sizeof(*llcd->llcd_cookies)),
283 sending_list = &lcm->lcm_llcd_resend;
284 if (!list_empty(sending_list))
289 /* If we are force exiting, just drop all of the cookies. */
290 if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
291 spin_lock(&lcm->lcm_llcd_lock);
292 list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
293 list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
294 list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
295 spin_unlock(&lcm->lcm_llcd_lock);
297 list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
301 OBD_FREE(lcd, sizeof(*lcd));
303 spin_lock(&lcm->lcm_thread_lock);
304 atomic_dec(&lcm->lcm_thread_total);
305 spin_unlock(&lcm->lcm_thread_lock);
306 wake_up(&lcm->lcm_waitq);
308 CDEBUG(D_HA, "%s exiting\n", current->comm);
312 int llog_start_commit_thread(void)
317 if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
320 rc = kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
322 CERROR("error starting thread #%d: %d\n",
323 atomic_read(&lcm->lcm_thread_total), rc);
329 EXPORT_SYMBOL(llog_start_commit_thread);
331 int llog_init_commit_master(void)
333 INIT_LIST_HEAD(&lcm->lcm_thread_busy);
334 INIT_LIST_HEAD(&lcm->lcm_thread_idle);
335 spin_lock_init(&lcm->lcm_thread_lock);
336 atomic_set(&lcm->lcm_thread_numidle, 0);
337 init_waitqueue_head(&lcm->lcm_waitq);
338 INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
339 INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
340 INIT_LIST_HEAD(&lcm->lcm_llcd_free);
341 spin_lock_init(&lcm->lcm_llcd_lock);
342 atomic_set(&lcm->lcm_llcd_numfree, 0);
343 lcm->lcm_llcd_minfree = 0;
344 lcm->lcm_thread_max = 5;
348 int llog_cleanup_commit_master(int force)
350 lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
352 lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
353 wake_up(&lcm->lcm_waitq);
355 wait_event_interruptible(lcm->lcm_waitq,
356 atomic_read(&lcm->lcm_thread_total) == 0);