Whamcloud - gitweb
file lustre_types.h was added on branch b1_4_mountconf on 2006-04-26 18:45:44 +0000
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * OST<->MDS recovery logging thread.
26  *
27  * Invariants in implementation:
28  * - we do not share logs among different OST<->MDS connections, so that
29  *   if an OST or MDS fails it need only look at log(s) relevant to itself
30  */
31
32 #define DEBUG_SUBSYSTEM S_LOG
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37
38 #ifdef __KERNEL__
39 #include <linux/fs.h>
40 #else
41 # include <libcfs/list.h>
42 # include <liblustre.h>
43 #endif
44
45 #include <libcfs/kp30.h>
46 #include <linux/obd_class.h>
47 #include <linux/lustre_commit_confd.h>
48 #include <linux/obd_support.h>
49 #include <linux/obd_class.h>
50 #include <linux/lustre_net.h>
51 #include <lnet/types.h>
52 #include <libcfs/list.h>
53 #include <linux/lustre_log.h>
54 #include "ptlrpc_internal.h"
55
56 #ifdef __KERNEL__
57
58 static struct llog_commit_master lustre_lcm;
59 static struct llog_commit_master *lcm = &lustre_lcm;
60
61 /* Allocate new commit structs in case we do not have enough.
62  * Make the llcd size small enough that it fits into a single page when we
63  * are sending/receiving it. */
64 static int llcd_alloc(void)
65 {
66         struct llog_canceld_ctxt *llcd;
67         int llcd_size = 0;
68
69         llcd_size = 4096 - lustre_msg_size(1, &llcd_size);
70         OBD_ALLOC(llcd,
71                   llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies));
72         if (llcd == NULL)
73                 return -ENOMEM;
74
75         llcd->llcd_size = llcd_size;
76         llcd->llcd_lcm = lcm;
77
78         spin_lock(&lcm->lcm_llcd_lock);
79         list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
80         atomic_inc(&lcm->lcm_llcd_numfree);
81         spin_unlock(&lcm->lcm_llcd_lock);
82
83         return 0;
84 }
85
86 /* Get a free cookie struct from the list */
87 struct llog_canceld_ctxt *llcd_grab(void)
88 {
89         struct llog_canceld_ctxt *llcd;
90
91 repeat:
92         spin_lock(&lcm->lcm_llcd_lock);
93         if (list_empty(&lcm->lcm_llcd_free)) {
94                 spin_unlock(&lcm->lcm_llcd_lock);
95                 if (llcd_alloc() < 0) {
96                         CERROR("unable to allocate log commit data!\n");
97                         return NULL;
98                 }
99                 /* check new llcd wasn't grabbed while lock dropped, b=7407 */
100                 goto repeat;
101         }
102
103         llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
104         list_del(&llcd->llcd_list);
105         atomic_dec(&lcm->lcm_llcd_numfree);
106         spin_unlock(&lcm->lcm_llcd_lock);
107
108         llcd->llcd_cookiebytes = 0;
109
110         return llcd;
111 }
112 EXPORT_SYMBOL(llcd_grab);
113
114 static void llcd_put(struct llog_canceld_ctxt *llcd)
115 {
116         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
117                 int llcd_size = llcd->llcd_size +
118                          offsetof(struct llog_canceld_ctxt, llcd_cookies);
119                 OBD_FREE(llcd, llcd_size);
120         } else {
121                 spin_lock(&lcm->lcm_llcd_lock);
122                 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
123                 atomic_inc(&lcm->lcm_llcd_numfree);
124                 spin_unlock(&lcm->lcm_llcd_lock);
125         }
126 }
127
128 /* Send some cookies to the appropriate target */
129 void llcd_send(struct llog_canceld_ctxt *llcd)
130 {
131         spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
132         list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
133         spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
134
135         wake_up_nr(&llcd->llcd_lcm->lcm_waitq, 1);
136 }
137 EXPORT_SYMBOL(llcd_send);
138
139 /* deleted objects have a commit callback that cancels the MDS
140  * log record for the deletion.  The commit callback calls this
141  * function
142  */
143 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
144                          struct lov_stripe_md *lsm, int count,
145                          struct llog_cookie *cookies, int flags)
146 {
147         struct llog_canceld_ctxt *llcd;
148         int rc = 0;
149         ENTRY;
150
151         LASSERT(ctxt);
152
153         down(&ctxt->loc_sem);
154         if (ctxt->loc_imp == NULL) {
155                 CWARN("no import for ctxt %p\n", ctxt);
156                 GOTO(out, rc = 0);
157         }
158
159         llcd = ctxt->loc_llcd;
160
161         if (count > 0 && cookies != NULL) {
162                 if (llcd == NULL) {
163                         llcd = llcd_grab();
164                         if (llcd == NULL) {
165                                 CERROR("couldn't get an llcd - dropped "LPX64
166                                        ":%x+%u\n",
167                                        cookies->lgc_lgl.lgl_oid,
168                                        cookies->lgc_lgl.lgl_ogen, 
169                                        cookies->lgc_index);
170                                 GOTO(out, rc = -ENOMEM);
171                         }
172                         llcd->llcd_ctxt = ctxt;
173                         ctxt->loc_llcd = llcd;
174                 }
175
176                 memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
177                        cookies, sizeof(*cookies));
178                 llcd->llcd_cookiebytes += sizeof(*cookies);
179         } else {
180                 if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
181                         GOTO(out, rc);
182         }
183
184         if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||
185             (flags & OBD_LLOG_FL_SENDNOW)) {
186                 CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
187                 ctxt->loc_llcd = NULL;
188                 llcd_send(llcd);
189         }
190 out:
191         up(&ctxt->loc_sem);
192         return rc;
193 }
194 EXPORT_SYMBOL(llog_obd_repl_cancel);
195
196 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
197 {
198         int rc = 0;
199         ENTRY;
200
201         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
202                 CDEBUG(D_HA, "reverse import disconnected, put llcd %p:%p\n",
203                        ctxt->loc_llcd, ctxt);
204                 down(&ctxt->loc_sem);
205                 if (ctxt->loc_llcd != NULL) {
206                         llcd_put(ctxt->loc_llcd);
207                         ctxt->loc_llcd = NULL;
208                 }
209                 ctxt->loc_imp = NULL;
210                 up(&ctxt->loc_sem);
211         } else {
212                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
213         }
214
215         RETURN(rc);
216 }
217 EXPORT_SYMBOL(llog_obd_repl_sync);
218
219 static int log_commit_thread(void *arg)
220 {
221         struct llog_commit_master *lcm = arg;
222         struct llog_commit_daemon *lcd;
223         struct llog_canceld_ctxt *llcd, *n;
224         unsigned long flags;
225         ENTRY;
226
227         OBD_ALLOC(lcd, sizeof(*lcd));
228         if (lcd == NULL)
229                 RETURN(-ENOMEM);
230
231         lock_kernel();
232         ptlrpc_daemonize(); /* thread never needs to do IO */
233
234         SIGNAL_MASK_LOCK(current, flags);
235         sigfillset(&current->blocked);
236         RECALC_SIGPENDING;
237         SIGNAL_MASK_UNLOCK(current, flags);
238
239         spin_lock(&lcm->lcm_thread_lock);
240         THREAD_NAME(current->comm, sizeof(current->comm) - 1,
241                     "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total));
242         atomic_inc(&lcm->lcm_thread_total);
243         spin_unlock(&lcm->lcm_thread_lock);
244         unlock_kernel();
245
246         INIT_LIST_HEAD(&lcd->lcd_lcm_list);
247         INIT_LIST_HEAD(&lcd->lcd_llcd_list);
248         lcd->lcd_lcm = lcm;
249
250         CDEBUG(D_HA, "%s started\n", current->comm);
251         do {
252                 struct ptlrpc_request *request;
253                 struct obd_import *import = NULL;
254                 struct list_head *sending_list;
255                 int rc = 0;
256
257                 /* If we do not have enough pages available, allocate some */
258                 while (atomic_read(&lcm->lcm_llcd_numfree) <
259                        lcm->lcm_llcd_minfree) {
260                         if (llcd_alloc() < 0)
261                                 break;
262                 }
263
264                 spin_lock(&lcm->lcm_thread_lock);
265                 atomic_inc(&lcm->lcm_thread_numidle);
266                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
267                 spin_unlock(&lcm->lcm_thread_lock);
268
269                 wait_event_interruptible(lcm->lcm_waitq,
270                                          !list_empty(&lcm->lcm_llcd_pending) ||
271                                          lcm->lcm_flags & LLOG_LCM_FL_EXIT);
272
273                 /* If we are the last available thread, start a new one in case
274                  * we get blocked on an RPC (nobody else will start a new one)*/
275                 spin_lock(&lcm->lcm_thread_lock);
276                 atomic_dec(&lcm->lcm_thread_numidle);
277                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
278                 spin_unlock(&lcm->lcm_thread_lock);
279
280                 sending_list = &lcm->lcm_llcd_pending;
281         resend:
282                 import = NULL;
283                 if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
284                         lcm->lcm_llcd_maxfree = 0;
285                         lcm->lcm_llcd_minfree = 0;
286                         lcm->lcm_thread_max = 0;
287
288                         if (list_empty(&lcm->lcm_llcd_pending) ||
289                             lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
290                                 break;
291                 }
292
293                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
294                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
295                         rc = llog_start_commit_thread();
296                         if (rc < 0)
297                                 CERROR("error starting thread: rc %d\n", rc);
298                 }
299
300                 /* Move all of the pending cancels from the same OST off of
301                  * the list, so we don't get multiple threads blocked and/or
302                  * doing upcalls on the same OST in case of failure. */
303                 spin_lock(&lcm->lcm_llcd_lock);
304                 if (!list_empty(sending_list)) {
305                         list_move_tail(sending_list->next,
306                                        &lcd->lcd_llcd_list);
307                         llcd = list_entry(lcd->lcd_llcd_list.next,
308                                           typeof(*llcd), llcd_list);
309                         LASSERT(llcd->llcd_lcm == lcm);
310                         import = llcd->llcd_ctxt->loc_imp;
311                 }
312                 list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
313                         LASSERT(llcd->llcd_lcm == lcm);
314                         if (import == llcd->llcd_ctxt->loc_imp)
315                                 list_move_tail(&llcd->llcd_list,
316                                                &lcd->lcd_llcd_list);
317                 }
318                 if (sending_list != &lcm->lcm_llcd_resend) {
319                         list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
320                                                  llcd_list) {
321                                 LASSERT(llcd->llcd_lcm == lcm);
322                                 if (import == llcd->llcd_ctxt->loc_imp)
323                                         list_move_tail(&llcd->llcd_list,
324                                                        &lcd->lcd_llcd_list);
325                         }
326                 }
327                 spin_unlock(&lcm->lcm_llcd_lock);
328
329                 /* We are the only one manipulating our local list - no lock */
330                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
331                         char *bufs[1] = {(char *)llcd->llcd_cookies};
332
333                         list_del(&llcd->llcd_list);
334                         if (llcd->llcd_cookiebytes == 0) {
335                                 CDEBUG(D_HA, "put empty llcd %p:%p\n",
336                                        llcd, llcd->llcd_ctxt);
337                                 llcd_put(llcd);
338                                 continue;
339                         }
340
341                         down(&llcd->llcd_ctxt->loc_sem);
342                         if (llcd->llcd_ctxt->loc_imp == NULL) {
343                                 up(&llcd->llcd_ctxt->loc_sem);
344                                 CWARN("import will be destroyed, put "
345                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
346                                 llcd_put(llcd);
347                                 continue;
348                         }
349                         up(&llcd->llcd_ctxt->loc_sem);
350
351                         if (!import || (import == LP_POISON)) {
352                                 CERROR("No import %p (llcd=%p, ctxt=%p)\n",
353                                        import, llcd, llcd->llcd_ctxt);
354                                 llcd_put(llcd);
355                                 continue;
356                         }
357
358                         request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION,
359                                                   OBD_LOG_CANCEL, 1,
360                                                   &llcd->llcd_cookiebytes,
361                                                   bufs);
362                         if (request == NULL) {
363                                 rc = -ENOMEM;
364                                 CERROR("error preparing commit: rc %d\n", rc);
365
366                                 spin_lock(&lcm->lcm_llcd_lock);
367                                 list_splice(&lcd->lcd_llcd_list,
368                                             &lcm->lcm_llcd_resend);
369                                 INIT_LIST_HEAD(&lcd->lcd_llcd_list);
370                                 spin_unlock(&lcm->lcm_llcd_lock);
371                                 break;
372                         }
373
374                         /* XXX FIXME bug 249, 5515 */
375                         request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
376                         request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
377
378                         request->rq_replen = lustre_msg_size(0, NULL);
379                         down(&llcd->llcd_ctxt->loc_sem);
380                         if (llcd->llcd_ctxt->loc_imp == NULL) {
381                                 up(&llcd->llcd_ctxt->loc_sem);
382                                 CWARN("import will be destroyed, put "
383                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
384                                 llcd_put(llcd);
385                                 ptlrpc_req_finished(request);
386                                 continue;
387                         }
388                         up(&llcd->llcd_ctxt->loc_sem);
389                         rc = ptlrpc_queue_wait(request);
390                         ptlrpc_req_finished(request);
391
392                         /* If the RPC failed, we put this and the remaining
393                          * messages onto the resend list for another time. */
394                         if (rc == 0) {
395                                 llcd_put(llcd);
396                                 continue;
397                         }
398
399                         CERROR("commit %p:%p drop %d cookies: rc %d\n",
400                                llcd, llcd->llcd_ctxt,
401                                (int)(llcd->llcd_cookiebytes /
402                                      sizeof(*llcd->llcd_cookies)), rc);
403                         llcd_put(llcd);
404                 }
405
406                 if (rc == 0) {
407                         sending_list = &lcm->lcm_llcd_resend;
408                         if (!list_empty(sending_list))
409                                 goto resend;
410                 }
411         } while(1);
412
413         /* If we are force exiting, just drop all of the cookies. */
414         if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
415                 spin_lock(&lcm->lcm_llcd_lock);
416                 list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
417                 list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
418                 list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
419                 spin_unlock(&lcm->lcm_llcd_lock);
420
421                 list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
422                         llcd_put(llcd);
423         }
424
425         spin_lock(&lcm->lcm_thread_lock);
426         list_del(&lcd->lcd_lcm_list);
427         spin_unlock(&lcm->lcm_thread_lock);
428         OBD_FREE(lcd, sizeof(*lcd));
429
430         CDEBUG(D_HA, "%s exiting\n", current->comm);
431
432         spin_lock(&lcm->lcm_thread_lock);
433         atomic_dec(&lcm->lcm_thread_total);
434         spin_unlock(&lcm->lcm_thread_lock);
435         wake_up(&lcm->lcm_waitq);
436
437         return 0;
438 }
439
440 int llog_start_commit_thread(void)
441 {
442         int rc;
443         ENTRY;
444
445         if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
446                 RETURN(0);
447
448         rc = kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
449         if (rc < 0) {
450                 CERROR("error starting thread #%d: %d\n",
451                        atomic_read(&lcm->lcm_thread_total), rc);
452                 RETURN(rc);
453         }
454
455         RETURN(0);
456 }
457 EXPORT_SYMBOL(llog_start_commit_thread);
458
459 static struct llog_process_args {
460         struct semaphore         llpa_sem;
461         struct llog_ctxt        *llpa_ctxt;
462         void                    *llpa_cb;
463         void                    *llpa_arg;
464 } llpa;
465
466 int llog_init_commit_master(void)
467 {
468         INIT_LIST_HEAD(&lcm->lcm_thread_busy);
469         INIT_LIST_HEAD(&lcm->lcm_thread_idle);
470         spin_lock_init(&lcm->lcm_thread_lock);
471         atomic_set(&lcm->lcm_thread_numidle, 0);
472         init_waitqueue_head(&lcm->lcm_waitq);
473         INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
474         INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
475         INIT_LIST_HEAD(&lcm->lcm_llcd_free);
476         spin_lock_init(&lcm->lcm_llcd_lock);
477         atomic_set(&lcm->lcm_llcd_numfree, 0);
478         lcm->lcm_llcd_minfree = 0;
479         lcm->lcm_thread_max = 5;
480         /* FIXME initialize semaphore for llog_process_args */
481         sema_init(&llpa.llpa_sem, 1);
482         return 0;
483 }
484
485 int llog_cleanup_commit_master(int force)
486 {
487         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
488         if (force)
489                 lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
490         wake_up(&lcm->lcm_waitq);
491
492         wait_event_interruptible(lcm->lcm_waitq,
493                                  atomic_read(&lcm->lcm_thread_total) == 0);
494         return 0;
495 }
496
497 static int log_process_thread(void *args)
498 {
499         struct llog_process_args *data = args;
500         struct llog_ctxt *ctxt = data->llpa_ctxt;
501         void   *cb = data->llpa_cb;
502         struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);
503         struct llog_handle *llh = NULL;
504         unsigned long flags;
505         int rc;
506         ENTRY;
507
508         up(&data->llpa_sem);
509         lock_kernel();
510         ptlrpc_daemonize();     /* thread does IO to log files */
511         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "llog_process");
512
513         SIGNAL_MASK_LOCK(current, flags);
514         sigfillset(&current->blocked);
515         RECALC_SIGPENDING;
516         SIGNAL_MASK_UNLOCK(current, flags);
517         unlock_kernel();
518
519         rc = llog_create(ctxt, &llh, &logid, NULL);
520         if (rc) {
521                 CERROR("llog_create failed %d\n", rc);
522                 RETURN(rc);
523         }
524         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
525         if (rc) {
526                 CERROR("llog_init_handle failed %d\n", rc);
527                 GOTO(out, rc);
528         }
529
530         if (cb) {
531                 rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
532                 if (rc != LLOG_PROC_BREAK)
533                         CERROR("llog_cat_process failed %d\n", rc);
534         } else {
535                 CWARN("no callback function for recovery\n");
536         }
537
538         CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
539                ctxt->loc_llcd, ctxt);
540         llog_sync(ctxt, NULL);
541 out:
542         rc = llog_cat_put(llh);
543         if (rc)
544                 CERROR("llog_cat_put failed %d\n", rc);
545
546         RETURN(rc);
547 }
548
549 static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
550 {
551         int rc;
552         ENTRY;
553
554         down(&llpa.llpa_sem);
555         llpa.llpa_ctxt = ctxt;
556         llpa.llpa_cb = handle;
557         llpa.llpa_arg = arg;
558
559         rc = kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
560         if (rc < 0)
561                 CERROR("error starting log_process_thread: %d\n", rc);
562         else {
563                 CDEBUG(D_HA, "log_process_thread: %d\n", rc);
564                 rc = 0;
565         }
566
567         RETURN(rc);
568 }
569
570 int llog_repl_connect(struct llog_ctxt *ctxt, int count,
571                       struct llog_logid *logid, struct llog_gen *gen,
572                       struct obd_uuid *uuid)
573 {
574         struct llog_canceld_ctxt *llcd;
575         int rc;
576         ENTRY;
577
578         /* send back llcd before recovery from llog */
579         if (ctxt->loc_llcd != NULL) {
580                 CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);
581                 llog_sync(ctxt, NULL);
582         }
583
584         down(&ctxt->loc_sem);
585         ctxt->loc_gen = *gen;
586         llcd = llcd_grab();
587         if (llcd == NULL) {
588                 CERROR("couldn't get an llcd\n");
589                 up(&ctxt->loc_sem);
590                 RETURN(-ENOMEM);
591         }
592         llcd->llcd_ctxt = ctxt;
593         ctxt->loc_llcd = llcd;
594         up(&ctxt->loc_sem);
595
596         rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
597         if (rc != 0)
598                 CERROR("error recovery process: %d\n", rc);
599
600         RETURN(rc);
601 }
602 EXPORT_SYMBOL(llog_repl_connect);
603
604 #else /* !__KERNEL__ */
605
606 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
607                          struct lov_stripe_md *lsm, int count,
608                          struct llog_cookie *cookies, int flags)
609 {
610         return 0;
611 }
612 #endif