Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * OST<->MDS recovery logging thread.
23  *
24  * Invariants in implementation:
25  * - we do not share logs among different OST<->MDS connections, so that
26  *   if an OST or MDS fails it need only look at log(s) relevant to itself
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <linux/fs.h>
37 #else
38 # include <portals/list.h>
39 # include <liblustre.h>
40 #endif
41
42 #include <linux/kp30.h>
43 #include <linux/obd_class.h>
44 #include <linux/lustre_commit_confd.h>
45 #include <linux/obd_support.h>
46 #include <linux/obd_class.h>
47 #include <linux/lustre_net.h>
48 #include <portals/types.h>
49 #include <portals/list.h>
50 #include <linux/lustre_log.h>
51 #include "ptlrpc_internal.h"
52
53 #ifdef __KERNEL__
54
55 static struct llog_commit_master lustre_lcm;
56 static struct llog_commit_master *lcm = &lustre_lcm;
57
58 /* Allocate new commit structs in case we do not have enough */
59 static int llcd_alloc(void)
60 {
61         struct llog_canceld_ctxt *llcd;
62         int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
63
64         OBD_ALLOC(llcd, PAGE_SIZE + offset);
65         if (llcd == NULL)
66                 return -ENOMEM;
67
68         llcd->llcd_lcm = lcm;
69
70         spin_lock(&lcm->lcm_llcd_lock);
71         list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
72         atomic_inc(&lcm->lcm_llcd_numfree);
73         spin_unlock(&lcm->lcm_llcd_lock);
74
75         return 0;
76 }
77
78 /* Get a free cookie struct from the list */
79 struct llog_canceld_ctxt *llcd_grab(void)
80 {
81         struct llog_canceld_ctxt *llcd;
82
83         spin_lock(&lcm->lcm_llcd_lock);
84         if (list_empty(&lcm->lcm_llcd_free)) {
85                 spin_unlock(&lcm->lcm_llcd_lock);
86                 if (llcd_alloc() < 0) {
87                         CERROR("unable to allocate log commit data!\n");
88                         return NULL;
89                 }
90                 spin_lock(&lcm->lcm_llcd_lock);
91         }
92
93         llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
94         list_del(&llcd->llcd_list);
95         atomic_dec(&lcm->lcm_llcd_numfree);
96         spin_unlock(&lcm->lcm_llcd_lock);
97
98         llcd->llcd_tries = 0;
99         llcd->llcd_cookiebytes = 0;
100
101         return llcd;
102 }
103 EXPORT_SYMBOL(llcd_grab);
104
105 static void llcd_put(struct llog_canceld_ctxt *llcd)
106 {
107         int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
108
109         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
110                 OBD_FREE(llcd, PAGE_SIZE + offset);
111         } else {
112                 spin_lock(&lcm->lcm_llcd_lock);
113                 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
114                 atomic_inc(&lcm->lcm_llcd_numfree);
115                 spin_unlock(&lcm->lcm_llcd_lock);
116         }
117 }
118
119 /* Send some cookies to the appropriate target */
120 void llcd_send(struct llog_canceld_ctxt *llcd)
121 {
122         spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
123         list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
124         spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
125
126         wake_up_nr(&llcd->llcd_lcm->lcm_waitq, 1);
127 }
128 EXPORT_SYMBOL(llcd_send);
129
130 /* deleted objects have a commit callback that cancels the MDS
131  * log record for the deletion.  The commit callback calls this 
132  * function 
133  */
134 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
135                          struct lov_stripe_md *lsm, int count,
136                          struct llog_cookie *cookies, int flags)
137 {
138         struct llog_canceld_ctxt *llcd;
139         int rc = 0;
140         ENTRY;
141
142         LASSERT(ctxt);
143
144         if (count == 0 || cookies == NULL) {
145                 down(&ctxt->loc_sem);
146                 if (ctxt->loc_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
147                         GOTO(out, rc);
148
149                 llcd = ctxt->loc_llcd;
150                 GOTO(send_now, rc);
151         }
152
153         down(&ctxt->loc_sem);
154         llcd = ctxt->loc_llcd;
155         if (llcd == NULL) {
156                 llcd = llcd_grab();
157                 if (llcd == NULL) {
158                         CERROR("couldn't get an llcd - dropped "LPX64":%x+%u\n",
159                                cookies->lgc_lgl.lgl_oid,
160                                cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
161                         GOTO(out, rc = -ENOMEM);
162                 }
163                 llcd->llcd_import = ctxt->loc_imp;
164                 llcd->llcd_gen = ctxt->loc_gen;
165                 ctxt->loc_llcd = llcd;
166         }
167
168         memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
169                sizeof(*cookies));
170         llcd->llcd_cookiebytes += sizeof(*cookies);
171
172 send_now:
173         if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
174              flags & OBD_LLOG_FL_SENDNOW)) {
175                 CDEBUG(D_HA, "send llcd: %p\n", llcd);
176                 ctxt->loc_llcd = NULL;
177                 llcd_send(llcd);
178         }
179 out:
180         up(&ctxt->loc_sem);
181         return rc;
182 }
183 EXPORT_SYMBOL(llog_obd_repl_cancel);
184
185 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
186 {
187         int rc = 0;
188         ENTRY;
189
190         LASSERT(ctxt->loc_llcd);
191
192         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
193                 CWARN("import will be destroyed, put llcd %p\n", 
194                       ctxt->loc_llcd);
195                 llcd_put(ctxt->loc_llcd);
196                 ctxt->loc_llcd = NULL;
197                 up(&ctxt->loc_sem);
198         } else {
199                 up(&ctxt->loc_sem);
200                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
201         }
202
203         RETURN(rc);
204 }
205 EXPORT_SYMBOL(llog_obd_repl_sync);
206
207 static int log_commit_thread(void *arg)
208 {
209         struct llog_commit_master *lcm = arg;
210         struct llog_commit_daemon *lcd;
211         struct llog_canceld_ctxt *llcd, *n;
212         unsigned long flags;
213         ENTRY;
214
215         OBD_ALLOC(lcd, sizeof(*lcd));
216         if (lcd == NULL)
217                 RETURN(-ENOMEM);
218
219         lock_kernel();
220         ptlrpc_daemonize(); /* thread never needs to do IO */
221
222         SIGNAL_MASK_LOCK(current, flags);
223         sigfillset(&current->blocked);
224         RECALC_SIGPENDING;
225         SIGNAL_MASK_UNLOCK(current, flags);
226
227         spin_lock(&lcm->lcm_thread_lock);
228         THREAD_NAME(current->comm, "ll_log_commit_%d",
229                     atomic_read(&lcm->lcm_thread_total));
230         atomic_inc(&lcm->lcm_thread_total);
231         spin_unlock(&lcm->lcm_thread_lock);
232         unlock_kernel();
233
234         INIT_LIST_HEAD(&lcd->lcd_lcm_list);
235         INIT_LIST_HEAD(&lcd->lcd_llcd_list);
236         lcd->lcd_lcm = lcm;
237
238         CDEBUG(D_HA, "%s started\n", current->comm);
239         do {
240                 struct ptlrpc_request *request;
241                 struct obd_import *import = NULL;
242                 struct list_head *sending_list;
243                 int rc = 0;
244
245                 /* If we do not have enough pages available, allocate some */
246                 while (atomic_read(&lcm->lcm_llcd_numfree) <
247                        lcm->lcm_llcd_minfree) {
248                         if (llcd_alloc() < 0)
249                                 break;
250                 }
251
252                 spin_lock(&lcm->lcm_thread_lock);
253                 atomic_inc(&lcm->lcm_thread_numidle);
254                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
255                 spin_unlock(&lcm->lcm_thread_lock);
256
257                 wait_event_interruptible(lcm->lcm_waitq,
258                                          !list_empty(&lcm->lcm_llcd_pending) ||
259                                          lcm->lcm_flags & LLOG_LCM_FL_EXIT);
260
261                 /* If we are the last available thread, start a new one in case
262                  * we get blocked on an RPC (nobody else will start a new one)*/
263                 spin_lock(&lcm->lcm_thread_lock);
264                 atomic_dec(&lcm->lcm_thread_numidle);
265                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
266                 spin_unlock(&lcm->lcm_thread_lock);
267
268                 sending_list = &lcm->lcm_llcd_pending;
269         resend:
270                 if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
271                         lcm->lcm_llcd_maxfree = 0;
272                         lcm->lcm_llcd_minfree = 0;
273                         lcm->lcm_thread_max = 0;
274
275                         if (list_empty(&lcm->lcm_llcd_pending) ||
276                             lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
277                                 break;
278                 }
279
280                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
281                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
282                         rc = llog_start_commit_thread();
283                         if (rc < 0)
284                                 CERROR("error starting thread: rc %d\n", rc);
285                 }
286
287                 /* Move all of the pending cancels from the same OST off of
288                  * the list, so we don't get multiple threads blocked and/or
289                  * doing upcalls on the same OST in case of failure. */
290                 spin_lock(&lcm->lcm_llcd_lock);
291                 if (!list_empty(sending_list)) {
292                         list_move_tail(sending_list->next,
293                                        &lcd->lcd_llcd_list);
294                         llcd = list_entry(lcd->lcd_llcd_list.next,
295                                           typeof(*llcd), llcd_list);
296                         LASSERT(llcd->llcd_lcm == lcm);
297                         import = llcd->llcd_import;
298                 }
299                 list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
300                         LASSERT(llcd->llcd_lcm == lcm);
301                         if (import == llcd->llcd_import)
302                                 list_move_tail(&llcd->llcd_list,
303                                                &lcd->lcd_llcd_list);
304                 }
305                 if (sending_list != &lcm->lcm_llcd_resend) {
306                         list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
307                                                  llcd_list) {
308                                 LASSERT(llcd->llcd_lcm == lcm);
309                                 if (import == llcd->llcd_import)
310                                         list_move_tail(&llcd->llcd_list,
311                                                        &lcd->lcd_llcd_list);
312                         }
313                 }
314                 spin_unlock(&lcm->lcm_llcd_lock);
315
316                 /* We are the only one manipulating our local list - no lock */
317                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
318                         char *bufs[1] = {(char *)llcd->llcd_cookies};
319                         struct obd_device *obd = import->imp_obd;
320                         struct llog_ctxt *ctxt;
321
322                         list_del(&llcd->llcd_list);
323                         if (llcd->llcd_cookiebytes == 0) {
324                                 CDEBUG(D_HA, "just put empty llcd %p\n", llcd);
325                                 llcd_put(llcd);
326                                 continue;
327                         }
328                         /* check whether the cookies are new. if new then send, otherwise
329                          * just put llcd */
330                         ctxt = llog_get_context(obd, llcd->llcd_cookies[0].lgc_subsys + 1);
331                         LASSERT(ctxt != NULL);
332                         down(&ctxt->loc_sem);
333                         if (log_gen_lt(llcd->llcd_gen, ctxt->loc_gen)) {
334                                 up(&ctxt->loc_sem); 
335                                 CDEBUG(D_HA, "just put stale llcd %p\n", llcd);
336                                 llcd_put(llcd);
337                                 continue;
338                         }
339                         up(&ctxt->loc_sem); 
340
341                         request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
342                                                   &llcd->llcd_cookiebytes,
343                                                   bufs);
344                         if (request == NULL) {
345                                 rc = -ENOMEM;
346                                 CERROR("error preparing commit: rc %d\n", rc);
347
348                                 spin_lock(&lcm->lcm_llcd_lock);
349                                 list_splice(&lcd->lcd_llcd_list,
350                                             &lcm->lcm_llcd_resend);
351                                 INIT_LIST_HEAD(&lcd->lcd_llcd_list);
352                                 spin_unlock(&lcm->lcm_llcd_lock);
353                                 break;
354                         }
355
356                         request->rq_replen = lustre_msg_size(0, NULL);
357                         rc = ptlrpc_queue_wait(request);
358                         ptlrpc_req_finished(request);
359
360                         /* If the RPC failed, we put this and the remaining
361                          * messages onto the resend list for another time. */
362                         if (rc == 0) {
363                                 llcd_put(llcd);
364                                 continue;
365                         }
366
367 #if 0                   /* FIXME just put llcd, not send it again */
368                         spin_lock(&lcm->lcm_llcd_lock);
369                         list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
370                         if (++llcd->llcd_tries < 5) {
371                                 CERROR("commit %p failed on attempt %d: rc %d\n",
372                                        llcd, llcd->llcd_tries, rc);
373
374                                 list_add_tail(&llcd->llcd_list,
375                                               &lcm->lcm_llcd_resend);
376                                 spin_unlock(&lcm->lcm_llcd_lock);
377                         } else {
378                                 spin_unlock(&lcm->lcm_llcd_lock);
379 #endif
380                                 CERROR("commit %p dropped %d cookies: rc %d\n",
381                                        llcd, (int)(llcd->llcd_cookiebytes /
382                                                    sizeof(*llcd->llcd_cookies)),
383                                        rc);
384                                 llcd_put(llcd);
385 //                        }
386                         break;
387                 }
388
389                 if (rc == 0) {
390                         sending_list = &lcm->lcm_llcd_resend;
391                         if (!list_empty(sending_list))
392                                 goto resend;
393                 }
394         } while(1);
395
396         /* If we are force exiting, just drop all of the cookies. */
397         if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
398                 spin_lock(&lcm->lcm_llcd_lock);
399                 list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
400                 list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
401                 list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
402                 spin_unlock(&lcm->lcm_llcd_lock);
403
404                 list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
405                         llcd_put(llcd);
406         }
407
408         spin_lock(&lcm->lcm_thread_lock);
409         list_del(&lcd->lcd_lcm_list);
410         spin_unlock(&lcm->lcm_thread_lock);
411         OBD_FREE(lcd, sizeof(*lcd));
412
413         spin_lock(&lcm->lcm_thread_lock);
414         atomic_dec(&lcm->lcm_thread_total);
415         spin_unlock(&lcm->lcm_thread_lock);
416         wake_up(&lcm->lcm_waitq);
417
418         CDEBUG(D_HA, "%s exiting\n", current->comm);
419         return 0;
420 }
421
422 int llog_start_commit_thread(void)
423 {
424         int rc;
425         ENTRY;
426
427         if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
428                 RETURN(0);
429
430         rc = kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
431         if (rc < 0) {
432                 CERROR("error starting thread #%d: %d\n",
433                        atomic_read(&lcm->lcm_thread_total), rc);
434                 RETURN(rc);
435         }
436
437         RETURN(0);
438 }
439 EXPORT_SYMBOL(llog_start_commit_thread);
440
441 static struct llog_process_args {
442         struct semaphore         llpa_sem; 
443         struct llog_ctxt        *llpa_ctxt;
444         void                    *llpa_cb;
445         void                    *llpa_arg;
446 } llpa;
447 int llog_init_commit_master(void)
448 {
449         INIT_LIST_HEAD(&lcm->lcm_thread_busy);
450         INIT_LIST_HEAD(&lcm->lcm_thread_idle);
451         spin_lock_init(&lcm->lcm_thread_lock);
452         atomic_set(&lcm->lcm_thread_numidle, 0);
453         init_waitqueue_head(&lcm->lcm_waitq);
454         INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
455         INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
456         INIT_LIST_HEAD(&lcm->lcm_llcd_free);
457         spin_lock_init(&lcm->lcm_llcd_lock);
458         atomic_set(&lcm->lcm_llcd_numfree, 0);
459         lcm->lcm_llcd_minfree = 0;
460         lcm->lcm_thread_max = 5;
461         /* FIXME initialize semaphore for llog_process_args */
462         sema_init(&llpa.llpa_sem, 1);
463         return 0;
464 }
465
466 int llog_cleanup_commit_master(int force)
467 {
468         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
469         if (force)
470                 lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
471         wake_up(&lcm->lcm_waitq);
472
473         wait_event_interruptible(lcm->lcm_waitq,
474                                  atomic_read(&lcm->lcm_thread_total) == 0);
475         return 0;
476 }
477
478
479 static int log_process_thread(void *args)
480 {
481         struct llog_process_args *data = args;
482         struct llog_ctxt *ctxt = data->llpa_ctxt;
483         void   *cb = data->llpa_cb;
484         struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);
485         struct llog_handle *llh = NULL;
486         unsigned long flags;
487         int rc;
488         ENTRY;
489                                                                                                                              
490         up(&data->llpa_sem);
491         lock_kernel();
492         ptlrpc_daemonize(); /* thread never needs to do IO */
493                                                                                                                              
494         SIGNAL_MASK_LOCK(current, flags);
495         sigfillset(&current->blocked);
496         RECALC_SIGPENDING;
497         SIGNAL_MASK_UNLOCK(current, flags);
498         unlock_kernel();
499                                                                                                                              
500         rc = llog_create(ctxt, &llh, &logid, NULL);
501         if (rc) {
502                 CERROR("llog_create failed %d\n", rc);
503                 RETURN(rc);
504         }
505         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
506         if (rc) {
507                 CERROR("llog_init_handle failed %d\n", rc);
508                 GOTO(out, rc);
509         }
510                                                                                                                              
511         if (cb) {
512                 rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
513                 if (rc)
514                         CERROR("llog_cat_process failed %d\n", rc);
515         } else
516                 CWARN("no callback function for recovery\n");
517
518         CDEBUG(D_HA, "send to llcd :%p forcibly\n", ctxt->loc_llcd);
519         llog_sync(ctxt, NULL);
520 out:
521         rc = llog_cat_put(llh);
522         if (rc)
523                 CERROR("llog_cat_put failed %d\n", rc);
524                                                                                                                              
525         RETURN(rc);
526 }
527 static int llog_recovery_generic(struct llog_ctxt *ctxt,
528                                  void *handle,
529                                  void *arg)
530 {
531         int rc;
532         ENTRY;
533
534         down(&llpa.llpa_sem);
535         llpa.llpa_ctxt = ctxt;
536         llpa.llpa_cb = handle;
537         llpa.llpa_arg = arg;
538
539         rc = kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
540         if (rc < 0)
541                 CERROR("error starting log_process_thread: %d\n", rc);
542         else {
543                 CDEBUG(D_HA, "log_process_thread: %d\n", rc);
544                 rc = 0;
545         }
546
547         RETURN(rc);
548 }
549 int llog_repl_connect(struct llog_ctxt *ctxt, int count,
550                       struct llog_logid *logid, struct llog_ctxt_gen *gen)
551 {
552         struct llog_canceld_ctxt *llcd;
553         int rc;
554         ENTRY;
555                                                                                                                              
556         down(&ctxt->loc_sem);
557         ctxt->loc_gen = *gen;
558         llcd = ctxt->loc_llcd;
559         if (llcd) {
560                 CDEBUG(D_HA, "put current llcd when new connection arrives\n");
561                 llcd_put(llcd);
562         }
563         llcd = llcd_grab();
564         if (llcd == NULL) {
565                 CERROR("couldn't get an llcd\n");
566                 RETURN(-ENOMEM);
567         }
568         llcd->llcd_import = ctxt->loc_imp;
569         llcd->llcd_gen = ctxt->loc_gen;
570         ctxt->loc_llcd = llcd;
571         up(&ctxt->loc_sem);
572
573         rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); 
574         if (rc != 0)
575                 CERROR("error recovery process: %d\n", rc);
576
577         RETURN(rc);
578 }
579 EXPORT_SYMBOL(llog_repl_connect);
580
581 #else /* !__KERNEL__ */
582
583 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
584                          struct lov_stripe_md *lsm, int count,
585                          struct llog_cookie *cookies, int flags)
586 {
587         return 0;
588 }
589 #endif