Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * OST<->MDS recovery logging thread.
26  *
27  * Invariants in implementation:
28  * - we do not share logs among different OST<->MDS connections, so that
29  *   if an OST or MDS fails it need only look at log(s) relevant to itself
30  */
31
32 #define DEBUG_SUBSYSTEM S_LOG
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37
38 #ifdef __KERNEL__
39 # include <libcfs/libcfs.h>
40 #else
41 # include <libcfs/list.h>
42 # include <liblustre.h>
43 #endif
44
45 #include <obd_class.h>
46 #include <lustre_commit_confd.h>
47 #include <obd_support.h>
48 #include <obd_class.h>
49 #include <lustre_net.h>
50 #include <lnet/types.h>
51 #include <libcfs/list.h>
52 #include <lustre_log.h>
53 #include "ptlrpc_internal.h"
54
55 #ifdef __KERNEL__
56
57 /* Allocate new commit structs in case we do not have enough.
58  * Make the llcd size small enough that it fits into a single page when we
59  * are sending/receiving it. */
60 static int llcd_alloc(struct llog_commit_master *lcm)
61 {
62         struct llog_canceld_ctxt *llcd;
63         int llcd_size;
64
65         /* payload of lustre_msg V2 is bigger */
66         llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
67         OBD_ALLOC(llcd,
68                   llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies));
69         if (llcd == NULL)
70                 return -ENOMEM;
71
72         llcd->llcd_size = llcd_size;
73         llcd->llcd_lcm = lcm;
74
75         spin_lock(&lcm->lcm_llcd_lock);
76         list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
77         atomic_inc(&lcm->lcm_llcd_numfree);
78         spin_unlock(&lcm->lcm_llcd_lock);
79
80         return 0;
81 }
82
83 /* Get a free cookie struct from the list */
84 static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
85 {
86         struct llog_canceld_ctxt *llcd;
87
88 repeat:
89         spin_lock(&lcm->lcm_llcd_lock);
90         if (list_empty(&lcm->lcm_llcd_free)) {
91                 spin_unlock(&lcm->lcm_llcd_lock);
92                 if (llcd_alloc(lcm) < 0) {
93                         CERROR("unable to allocate log commit data!\n");
94                         return NULL;
95                 }
96                 /* check new llcd wasn't grabbed while lock dropped, b=7407 */
97                 goto repeat;
98         }
99
100         llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
101         list_del(&llcd->llcd_list);
102         atomic_dec(&lcm->lcm_llcd_numfree);
103         spin_unlock(&lcm->lcm_llcd_lock);
104
105         llcd->llcd_cookiebytes = 0;
106
107         return llcd;
108 }
109
110 static void llcd_put(struct llog_canceld_ctxt *llcd)
111 {
112         struct llog_commit_master *lcm = llcd->llcd_lcm;
113
114         llog_ctxt_put(llcd->llcd_ctxt);
115         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
116                 int llcd_size = llcd->llcd_size +
117                          offsetof(struct llog_canceld_ctxt, llcd_cookies);
118                 OBD_FREE(llcd, llcd_size);
119         } else {
120                 spin_lock(&lcm->lcm_llcd_lock);
121                 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
122                 atomic_inc(&lcm->lcm_llcd_numfree);
123                 spin_unlock(&lcm->lcm_llcd_lock);
124         }
125 }
126
127 /* Send some cookies to the appropriate target */
128 static void llcd_send(struct llog_canceld_ctxt *llcd)
129 {
130         if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
131                 spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
132                 list_add_tail(&llcd->llcd_list,
133                               &llcd->llcd_lcm->lcm_llcd_pending);
134                 spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
135         }
136         cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
137 }
138
139 /**
140  * Grab llcd and assign it to passed @ctxt. Also set up backward link
141  * and get ref on @ctxt.
142  */
143 static struct llog_canceld_ctxt *ctxt_llcd_grab(struct llog_ctxt *ctxt)
144 {
145         struct llog_canceld_ctxt *llcd;
146
147         LASSERT_SEM_LOCKED(&ctxt->loc_sem);
148         llcd = llcd_grab(ctxt->loc_lcm);
149         if (llcd == NULL)
150                 return NULL;
151
152         llcd->llcd_ctxt = llog_ctxt_get(ctxt);
153         ctxt->loc_llcd = llcd;
154
155         CDEBUG(D_RPCTRACE,"grab llcd %p:%p\n", ctxt->loc_llcd, ctxt);
156         return llcd;
157 }
158
159 /**
160  * Put llcd in passed @ctxt. Set ->loc_llcd to NULL.
161  */
162 static void ctxt_llcd_put(struct llog_ctxt *ctxt)
163 {
164         mutex_down(&ctxt->loc_sem);
165         if (ctxt->loc_llcd != NULL) {
166                 CDEBUG(D_RPCTRACE,"put llcd %p:%p\n", ctxt->loc_llcd, ctxt);
167                 llcd_put(ctxt->loc_llcd);
168                 ctxt->loc_llcd = NULL;
169         }
170         if (ctxt->loc_imp) {
171                 class_import_put(ctxt->loc_imp);
172                 ctxt->loc_imp = NULL;
173         }
174         mutex_up(&ctxt->loc_sem);
175 }
176
177 /* deleted objects have a commit callback that cancels the MDS
178  * log record for the deletion.  The commit callback calls this
179  * function
180  */
181 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
182                          struct lov_stripe_md *lsm, int count,
183                          struct llog_cookie *cookies, int flags)
184 {
185         struct llog_canceld_ctxt *llcd;
186         int rc = 0;
187         ENTRY;
188
189         LASSERT(ctxt);
190
191         mutex_down(&ctxt->loc_sem);
192         llcd = ctxt->loc_llcd;
193
194         if (ctxt->loc_imp == NULL) {
195                 CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);
196                 GOTO(out, rc = 0);
197         }
198
199         if (count > 0 && cookies != NULL) {
200                 if (llcd == NULL) {
201                         llcd = ctxt_llcd_grab(ctxt);
202                         if (llcd == NULL) {
203                                 CERROR("couldn't get an llcd - dropped "LPX64
204                                        ":%x+%u\n",
205                                        cookies->lgc_lgl.lgl_oid,
206                                        cookies->lgc_lgl.lgl_ogen, 
207                                        cookies->lgc_index);
208                                 GOTO(out, rc = -ENOMEM);
209                         }
210                 }
211
212                 memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
213                        cookies, sizeof(*cookies));
214                 llcd->llcd_cookiebytes += sizeof(*cookies);
215         } else {
216                 if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
217                         GOTO(out, rc);
218         }
219
220         if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||
221             (flags & OBD_LLOG_FL_SENDNOW)) {
222                 CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
223                 ctxt->loc_llcd = NULL;
224                 llcd_send(llcd);
225         }
226 out:
227         mutex_up(&ctxt->loc_sem);
228         return rc;
229 }
230 EXPORT_SYMBOL(llog_obd_repl_cancel);
231
232 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
233 {
234         int rc = 0;
235         ENTRY;
236
237         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
238                 CDEBUG(D_RPCTRACE,"reverse import disconnect\n");
239                 /* 
240                  * We put llcd because it is not going to sending list and
241                  * thus, its refc will not be handled. We will handle it here.
242                  */
243                 ctxt_llcd_put(ctxt);
244         } else {
245                 /* 
246                  * Sending cancel. This means that ctxt->loc_llcd wil be
247                  * put on sending list in llog_obd_repl_cancel() and in
248                  * this case recovery thread will take care of it refc.
249                  */
250                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
251         }
252         RETURN(rc);
253 }
254 EXPORT_SYMBOL(llog_obd_repl_sync);
255
256 static inline void stop_log_commit(struct llog_commit_master *lcm,
257                                    struct llog_commit_daemon *lcd,
258                                    int rc)
259 {
260         CERROR("error preparing commit: rc %d\n", rc);
261
262         spin_lock(&lcm->lcm_llcd_lock);
263         list_splice_init(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
264         spin_unlock(&lcm->lcm_llcd_lock);
265 }
266
267 static int log_commit_thread(void *arg)
268 {
269         struct llog_commit_master *lcm = arg;
270         struct llog_commit_daemon *lcd;
271         struct llog_canceld_ctxt *llcd, *n;
272         struct obd_import *import = NULL;
273         ENTRY;
274
275         OBD_ALLOC(lcd, sizeof(*lcd));
276         if (lcd == NULL)
277                 RETURN(-ENOMEM);
278
279         spin_lock(&lcm->lcm_thread_lock);
280         THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1,
281                     "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total));
282         atomic_inc(&lcm->lcm_thread_total);
283         spin_unlock(&lcm->lcm_thread_lock);
284
285         ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */
286
287         CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);
288         CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
289         lcd->lcd_lcm = lcm;
290
291         CDEBUG(D_HA, "%s started\n", cfs_curproc_comm());
292         do {
293                 struct ptlrpc_request *request;
294                 struct list_head *sending_list;
295                 int rc = 0;
296
297                 if (import)
298                         class_import_put(import);
299                 import = NULL;
300
301                 /* If we do not have enough pages available, allocate some */
302                 while (atomic_read(&lcm->lcm_llcd_numfree) <
303                        lcm->lcm_llcd_minfree) {
304                         if (llcd_alloc(lcm) < 0)
305                                 break;
306                 }
307
308                 spin_lock(&lcm->lcm_thread_lock);
309                 atomic_inc(&lcm->lcm_thread_numidle);
310                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
311                 spin_unlock(&lcm->lcm_thread_lock);
312
313                 wait_event_interruptible(lcm->lcm_waitq,
314                                          !list_empty(&lcm->lcm_llcd_pending) ||
315                                          lcm->lcm_flags & LLOG_LCM_FL_EXIT);
316
317                 /* If we are the last available thread, start a new one in case
318                  * we get blocked on an RPC (nobody else will start a new one)*/
319                 spin_lock(&lcm->lcm_thread_lock);
320                 atomic_dec(&lcm->lcm_thread_numidle);
321                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
322                 spin_unlock(&lcm->lcm_thread_lock);
323
324                 sending_list = &lcm->lcm_llcd_pending;
325         resend:
326                 if (import)
327                         class_import_put(import);
328                 import = NULL;
329                 if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
330                         lcm->lcm_llcd_maxfree = 0;
331                         lcm->lcm_llcd_minfree = 0;
332                         lcm->lcm_thread_max = 0;
333
334                         if (list_empty(&lcm->lcm_llcd_pending) ||
335                             lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
336                                 break;
337                 }
338
339                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
340                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
341                         rc = llog_start_commit_thread(lcm);
342                         if (rc < 0)
343                                 CERROR("error starting thread: rc %d\n", rc);
344                 }
345
346                 /* Move all of the pending cancels from the same OST off of
347                  * the list, so we don't get multiple threads blocked and/or
348                  * doing upcalls on the same OST in case of failure. */
349                 spin_lock(&lcm->lcm_llcd_lock);
350                 if (!list_empty(sending_list)) {
351                         list_move_tail(sending_list->next,
352                                        &lcd->lcd_llcd_list);
353                         llcd = list_entry(lcd->lcd_llcd_list.next,
354                                           typeof(*llcd), llcd_list);
355                         LASSERT(llcd->llcd_lcm == lcm);
356                         import = llcd->llcd_ctxt->loc_imp;
357                         if (import)
358                                 class_import_get(import);
359                 }
360                 list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
361                         LASSERT(llcd->llcd_lcm == lcm);
362                         if (import == llcd->llcd_ctxt->loc_imp)
363                                 list_move_tail(&llcd->llcd_list,
364                                                &lcd->lcd_llcd_list);
365                 }
366                 if (sending_list != &lcm->lcm_llcd_resend) {
367                         list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
368                                                  llcd_list) {
369                                 LASSERT(llcd->llcd_lcm == lcm);
370                                 if (import == llcd->llcd_ctxt->loc_imp)
371                                         list_move_tail(&llcd->llcd_list,
372                                                        &lcd->lcd_llcd_list);
373                         }
374                 }
375                 spin_unlock(&lcm->lcm_llcd_lock);
376
377                 /* We are the only one manipulating our local list - no lock */
378                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
379                         char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
380
381                         list_del(&llcd->llcd_list);
382                         if (llcd->llcd_cookiebytes == 0) {
383                                 CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n",
384                                        llcd, llcd->llcd_ctxt);
385                                 llcd_put(llcd);
386                                 continue;
387                         }
388
389                         mutex_down(&llcd->llcd_ctxt->loc_sem);
390                         if (llcd->llcd_ctxt->loc_imp == NULL) {
391                                 mutex_up(&llcd->llcd_ctxt->loc_sem);
392                                 CWARN("import will be destroyed, put "
393                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
394                                 llcd_put(llcd);
395                                 continue;
396                         }
397                         mutex_up(&llcd->llcd_ctxt->loc_sem);
398
399                         if (!import || (import == LP_POISON) ||
400                             (import->imp_client == LP_POISON)) {
401                                 CERROR("No import %p (llcd=%p, ctxt=%p)\n",
402                                        import, llcd, llcd->llcd_ctxt);
403                                 llcd_put(llcd);
404                                 continue;
405                         }
406
407                         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
408
409                         request = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
410                         if (request == NULL) {
411                                 rc = -ENOMEM;
412                                 stop_log_commit(lcm, lcd, rc);
413                                 break;
414                         }
415
416                         req_capsule_set_size(&request->rq_pill, &RMF_LOGCOOKIES,
417                                              RCL_CLIENT,llcd->llcd_cookiebytes);
418
419                         rc = ptlrpc_request_bufs_pack(request,
420                                                       LUSTRE_LOG_VERSION,
421                                                       OBD_LOG_CANCEL, bufs,
422                                                       NULL);
423                         if (rc) {
424                                 ptlrpc_request_free(request);
425                                 stop_log_commit(lcm, lcd, rc);
426                                 break;
427                         }
428
429                         /* bug 5515 */
430                         request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
431                         request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
432                         ptlrpc_at_set_req_timeout(request);
433
434                         ptlrpc_request_set_replen(request);
435                         mutex_down(&llcd->llcd_ctxt->loc_sem);
436                         if (llcd->llcd_ctxt->loc_imp == NULL) {
437                                 mutex_up(&llcd->llcd_ctxt->loc_sem);
438                                 CWARN("import will be destroyed, put "
439                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
440                                 llcd_put(llcd);
441                                 ptlrpc_req_finished(request);
442                                 continue;
443                         }
444                         mutex_up(&llcd->llcd_ctxt->loc_sem);
445                         rc = ptlrpc_queue_wait(request);
446                         ptlrpc_req_finished(request);
447
448                         /* If the RPC failed, we put this and the remaining
449                          * messages onto the resend list for another time. */
450                         if (rc == 0) {
451                                 llcd_put(llcd);
452                                 continue;
453                         }
454
455                         CERROR("commit %p:%p drop %d cookies: rc %d\n",
456                                llcd, llcd->llcd_ctxt,
457                                (int)(llcd->llcd_cookiebytes /
458                                      sizeof(*llcd->llcd_cookies)), rc);
459                         llcd_put(llcd);
460                 }
461
462                 if (rc == 0) {
463                         sending_list = &lcm->lcm_llcd_resend;
464                         if (!list_empty(sending_list))
465                                 goto resend;
466                 }
467         } while(1);
468
469         if (import)
470                 class_import_put(import);
471
472         /* If we are force exiting, just drop all of the cookies. */
473         if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
474                 spin_lock(&lcm->lcm_llcd_lock);
475                 list_splice_init(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
476                 list_splice_init(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
477                 list_splice_init(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
478                 spin_unlock(&lcm->lcm_llcd_lock);
479
480                 list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
481                         llcd_put(llcd);
482         }
483
484         spin_lock(&lcm->lcm_thread_lock);
485         list_del(&lcd->lcd_lcm_list);
486         spin_unlock(&lcm->lcm_thread_lock);
487         OBD_FREE(lcd, sizeof(*lcd));
488
489         CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm());
490
491         spin_lock(&lcm->lcm_thread_lock);
492         atomic_dec(&lcm->lcm_thread_total);
493         spin_unlock(&lcm->lcm_thread_lock);
494         cfs_waitq_signal(&lcm->lcm_waitq);
495
496         return 0;
497 }
498
499 int llog_start_commit_thread(struct llog_commit_master *lcm)
500 {
501         int rc;
502         ENTRY;
503
504         if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
505                 RETURN(0);
506
507         rc = cfs_kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
508         if (rc < 0) {
509                 CERROR("error starting thread #%d: %d\n",
510                        atomic_read(&lcm->lcm_thread_total), rc);
511                 RETURN(rc);
512         }
513
514         RETURN(0);
515 }
516 EXPORT_SYMBOL(llog_start_commit_thread);
517
518 static struct llog_process_args {
519         struct semaphore         llpa_sem;
520         struct llog_ctxt        *llpa_ctxt;
521         void                    *llpa_cb;
522         void                    *llpa_arg;
523 } llpa;
524
525 int llog_init_commit_master(struct llog_commit_master *lcm)
526 {
527         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
528         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
529         spin_lock_init(&lcm->lcm_thread_lock);
530         atomic_set(&lcm->lcm_thread_numidle, 0);
531         cfs_waitq_init(&lcm->lcm_waitq);
532         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
533         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
534         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free);
535         spin_lock_init(&lcm->lcm_llcd_lock);
536         atomic_set(&lcm->lcm_llcd_numfree, 0);
537         lcm->lcm_llcd_minfree = 0;
538         lcm->lcm_thread_max = 5;
539         /* FIXME initialize semaphore for llog_process_args */
540         sema_init(&llpa.llpa_sem, 1);
541         return 0;
542 }
543 EXPORT_SYMBOL(llog_init_commit_master);
544
545 int llog_cleanup_commit_master(struct llog_commit_master *lcm,
546                                int force)
547 {
548         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
549         if (force)
550                 lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
551         cfs_waitq_signal(&lcm->lcm_waitq);
552
553         wait_event_interruptible(lcm->lcm_waitq,
554                                  atomic_read(&lcm->lcm_thread_total) == 0);
555         return 0;
556 }
557 EXPORT_SYMBOL(llog_cleanup_commit_master);
558
559 static int log_process_thread(void *args)
560 {
561         struct llog_process_args *data = args;
562         struct llog_ctxt *ctxt = data->llpa_ctxt;
563         void   *cb = data->llpa_cb;
564         struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);
565         struct llog_handle *llh = NULL;
566         int rc;
567         ENTRY;
568
569         mutex_up(&data->llpa_sem);
570         ptlrpc_daemonize("llog_process");     /* thread does IO to log files */
571
572         rc = llog_create(ctxt, &llh, &logid, NULL);
573         if (rc) {
574                 CERROR("llog_create failed %d\n", rc);
575                 GOTO(out, rc);
576         }
577         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
578         if (rc) {
579                 CERROR("llog_init_handle failed %d\n", rc);
580                 GOTO(release_llh, rc);
581         }
582
583         if (cb) {
584                 rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
585                 if (rc != LLOG_PROC_BREAK)
586                         CERROR("llog_cat_process failed %d\n", rc);
587         } else {
588                 CWARN("no callback function for recovery\n");
589         }
590
591         CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
592                ctxt->loc_llcd, ctxt);
593         llog_sync(ctxt, NULL);
594
595 release_llh:
596         rc = llog_cat_put(llh);
597         if (rc)
598                 CERROR("llog_cat_put failed %d\n", rc);
599 out:
600         llog_ctxt_put(ctxt);
601         RETURN(rc);
602 }
603
604 static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
605 {
606         struct obd_device *obd = ctxt->loc_obd;
607         int rc;
608         ENTRY;
609
610         if (obd->obd_stopping)
611                 RETURN(-ENODEV);
612
613         mutex_down(&llpa.llpa_sem);
614         llpa.llpa_cb = handle;
615         llpa.llpa_arg = arg;
616         llpa.llpa_ctxt = llog_ctxt_get(ctxt);
617         if (!llpa.llpa_ctxt) {
618                 up(&llpa.llpa_sem);
619                 RETURN(-ENODEV);
620         }
621         rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
622         if (rc < 0) {
623                 llog_ctxt_put(ctxt);
624                 CERROR("error starting log_process_thread: %d\n", rc);
625         } else {
626                 CDEBUG(D_HA, "log_process_thread: %d\n", rc);
627                 rc = 0;
628         }
629
630         RETURN(rc);
631 }
632
633 int llog_repl_connect(struct llog_ctxt *ctxt, int count,
634                       struct llog_logid *logid, struct llog_gen *gen,
635                       struct obd_uuid *uuid)
636 {
637         struct llog_canceld_ctxt *llcd;
638         int rc;
639         ENTRY;
640
641         /* send back llcd before recovery from llog */
642         if (ctxt->loc_llcd != NULL) {
643                 CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);
644                 llog_sync(ctxt, NULL);
645         }
646
647         mutex_down(&ctxt->loc_sem);
648         ctxt->loc_gen = *gen;
649         llcd = ctxt_llcd_grab(ctxt);
650         if (llcd == NULL) {
651                 CERROR("couldn't get an llcd\n");
652                 mutex_up(&ctxt->loc_sem);
653                 RETURN(-ENOMEM);
654         }
655         mutex_up(&ctxt->loc_sem);
656
657         rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
658         if (rc != 0) {
659                 ctxt_llcd_put(ctxt);
660                 CERROR("error recovery process: %d\n", rc);
661         }
662         RETURN(rc);
663 }
664 EXPORT_SYMBOL(llog_repl_connect);
665
666 #else /* !__KERNEL__ */
667
668 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
669                          struct lov_stripe_md *lsm, int count,
670                          struct llog_cookie *cookies, int flags)
671 {
672         return 0;
673 }
674 #endif