Whamcloud - gitweb
91bd48fdba9851b08928d7929f42fe4cb09f9cee
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * OST<->MDS recovery logging thread.
26  *
27  * Invariants in implementation:
28  * - we do not share logs among different OST<->MDS connections, so that
29  *   if an OST or MDS fails it need only look at log(s) relevant to itself
30  */
31
32 #define DEBUG_SUBSYSTEM S_LOG
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37
38 #ifdef __KERNEL__
39 # include <libcfs/libcfs.h>
40 #else
41 # include <libcfs/list.h>
42 # include <liblustre.h>
43 #endif
44
45 #include <libcfs/kp30.h>
46 #include <obd_class.h>
47 #include <lustre_commit_confd.h>
48 #include <obd_support.h>
49 #include <obd_class.h>
50 #include <lustre_net.h>
51 #include <lnet/types.h>
52 #include <libcfs/list.h>
53 #include <lustre_log.h>
54 #include "ptlrpc_internal.h"
55
56 #ifdef __KERNEL__
57
58 static struct llog_commit_master lustre_lcm;
59 static struct llog_commit_master *lcm = &lustre_lcm;
60
61 /* Allocate new commit structs in case we do not have enough.
62  * Make the llcd size small enough that it fits into a single page when we
63  * are sending/receiving it. */
64 static int llcd_alloc(void)
65 {
66         struct llog_canceld_ctxt *llcd;
67         int llcd_size;
68
69         /* payload of lustre_msg V2 is bigger */
70         llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
71         OBD_ALLOC(llcd,
72                   llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies));
73         if (llcd == NULL)
74                 return -ENOMEM;
75
76         llcd->llcd_size = llcd_size;
77         llcd->llcd_lcm = lcm;
78
79         spin_lock(&lcm->lcm_llcd_lock);
80         list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
81         atomic_inc(&lcm->lcm_llcd_numfree);
82         spin_unlock(&lcm->lcm_llcd_lock);
83
84         return 0;
85 }
86
87 /* Get a free cookie struct from the list */
88 struct llog_canceld_ctxt *llcd_grab(void)
89 {
90         struct llog_canceld_ctxt *llcd;
91
92 repeat:
93         spin_lock(&lcm->lcm_llcd_lock);
94         if (list_empty(&lcm->lcm_llcd_free)) {
95                 spin_unlock(&lcm->lcm_llcd_lock);
96                 if (llcd_alloc() < 0) {
97                         CERROR("unable to allocate log commit data!\n");
98                         return NULL;
99                 }
100                 /* check new llcd wasn't grabbed while lock dropped, b=7407 */
101                 goto repeat;
102         }
103
104         llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
105         list_del(&llcd->llcd_list);
106         atomic_dec(&lcm->lcm_llcd_numfree);
107         spin_unlock(&lcm->lcm_llcd_lock);
108
109         llcd->llcd_cookiebytes = 0;
110
111         return llcd;
112 }
113 EXPORT_SYMBOL(llcd_grab);
114
115 static void llcd_put(struct llog_canceld_ctxt *llcd)
116 {
117         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
118                 int llcd_size = llcd->llcd_size +
119                          offsetof(struct llog_canceld_ctxt, llcd_cookies);
120                 OBD_FREE(llcd, llcd_size);
121         } else {
122                 spin_lock(&lcm->lcm_llcd_lock);
123                 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
124                 atomic_inc(&lcm->lcm_llcd_numfree);
125                 spin_unlock(&lcm->lcm_llcd_lock);
126         }
127 }
128
129 /* Send some cookies to the appropriate target */
130 void llcd_send(struct llog_canceld_ctxt *llcd)
131 {
132         spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
133         list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
134         spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
135
136         cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
137 }
138 EXPORT_SYMBOL(llcd_send);
139
140 /* deleted objects have a commit callback that cancels the MDS
141  * log record for the deletion.  The commit callback calls this
142  * function
143  */
144 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
145                          struct lov_stripe_md *lsm, int count,
146                          struct llog_cookie *cookies, int flags)
147 {
148         struct llog_canceld_ctxt *llcd;
149         int rc = 0;
150         ENTRY;
151
152         LASSERT(ctxt);
153
154         mutex_down(&ctxt->loc_sem);
155         if (ctxt->loc_imp == NULL) {
156                 CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);
157                 GOTO(out, rc = 0);
158         }
159
160         llcd = ctxt->loc_llcd;
161
162         if (count > 0 && cookies != NULL) {
163                 if (llcd == NULL) {
164                         llcd = llcd_grab();
165                         if (llcd == NULL) {
166                                 CERROR("couldn't get an llcd - dropped "LPX64
167                                        ":%x+%u\n",
168                                        cookies->lgc_lgl.lgl_oid,
169                                        cookies->lgc_lgl.lgl_ogen, 
170                                        cookies->lgc_index);
171                                 GOTO(out, rc = -ENOMEM);
172                         }
173                         llcd->llcd_ctxt = ctxt;
174                         ctxt->loc_llcd = llcd;
175                 }
176
177                 memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
178                        cookies, sizeof(*cookies));
179                 llcd->llcd_cookiebytes += sizeof(*cookies);
180         } else {
181                 if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
182                         GOTO(out, rc);
183         }
184
185         if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||
186             (flags & OBD_LLOG_FL_SENDNOW)) {
187                 CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
188                 ctxt->loc_llcd = NULL;
189                 llcd_send(llcd);
190         }
191 out:
192         mutex_up(&ctxt->loc_sem);
193         return rc;
194 }
195 EXPORT_SYMBOL(llog_obd_repl_cancel);
196
197 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
198 {
199         int rc = 0;
200         ENTRY;
201
202         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
203                 CDEBUG(D_RPCTRACE,"reverse import disconnect, put llcd %p:%p\n",
204                        ctxt->loc_llcd, ctxt);
205                 mutex_down(&ctxt->loc_sem);
206                 if (ctxt->loc_llcd != NULL) {
207                         llcd_put(ctxt->loc_llcd);
208                         ctxt->loc_llcd = NULL;
209                 }
210                 ctxt->loc_imp = NULL;
211                 mutex_up(&ctxt->loc_sem);
212         } else {
213                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
214         }
215
216         RETURN(rc);
217 }
218 EXPORT_SYMBOL(llog_obd_repl_sync);
219
220 static inline void stop_log_commit(struct llog_commit_master *lcm,
221                                    struct llog_commit_daemon *lcd,
222                                    int rc)
223 {
224         CERROR("error preparing commit: rc %d\n", rc);
225
226         spin_lock(&lcm->lcm_llcd_lock);
227         list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
228         CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
229         spin_unlock(&lcm->lcm_llcd_lock);
230 }
231
232 static int log_commit_thread(void *arg)
233 {
234         struct llog_commit_master *lcm = arg;
235         struct llog_commit_daemon *lcd;
236         struct llog_canceld_ctxt *llcd, *n;
237         struct obd_import *import = NULL;
238         ENTRY;
239
240         OBD_ALLOC(lcd, sizeof(*lcd));
241         if (lcd == NULL)
242                 RETURN(-ENOMEM);
243
244         spin_lock(&lcm->lcm_thread_lock);
245         THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1,
246                     "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total));
247         atomic_inc(&lcm->lcm_thread_total);
248         spin_unlock(&lcm->lcm_thread_lock);
249
250         ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */
251
252         CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);
253         CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
254         lcd->lcd_lcm = lcm;
255
256         CDEBUG(D_HA, "%s started\n", cfs_curproc_comm());
257         do {
258                 struct ptlrpc_request *request;
259                 struct list_head *sending_list;
260                 int rc = 0;
261
262                 if (import)
263                         class_import_put(import);
264                 import = NULL;
265
266                 /* If we do not have enough pages available, allocate some */
267                 while (atomic_read(&lcm->lcm_llcd_numfree) <
268                        lcm->lcm_llcd_minfree) {
269                         if (llcd_alloc() < 0)
270                                 break;
271                 }
272
273                 spin_lock(&lcm->lcm_thread_lock);
274                 atomic_inc(&lcm->lcm_thread_numidle);
275                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
276                 spin_unlock(&lcm->lcm_thread_lock);
277
278                 wait_event_interruptible(lcm->lcm_waitq,
279                                          !list_empty(&lcm->lcm_llcd_pending) ||
280                                          lcm->lcm_flags & LLOG_LCM_FL_EXIT);
281
282                 /* If we are the last available thread, start a new one in case
283                  * we get blocked on an RPC (nobody else will start a new one)*/
284                 spin_lock(&lcm->lcm_thread_lock);
285                 atomic_dec(&lcm->lcm_thread_numidle);
286                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
287                 spin_unlock(&lcm->lcm_thread_lock);
288
289                 sending_list = &lcm->lcm_llcd_pending;
290         resend:
291                 if (import)
292                         class_import_put(import);
293                 import = NULL;
294                 if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
295                         lcm->lcm_llcd_maxfree = 0;
296                         lcm->lcm_llcd_minfree = 0;
297                         lcm->lcm_thread_max = 0;
298
299                         if (list_empty(&lcm->lcm_llcd_pending) ||
300                             lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
301                                 break;
302                 }
303
304                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
305                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
306                         rc = llog_start_commit_thread();
307                         if (rc < 0)
308                                 CERROR("error starting thread: rc %d\n", rc);
309                 }
310
311                 /* Move all of the pending cancels from the same OST off of
312                  * the list, so we don't get multiple threads blocked and/or
313                  * doing upcalls on the same OST in case of failure. */
314                 spin_lock(&lcm->lcm_llcd_lock);
315                 if (!list_empty(sending_list)) {
316                         list_move_tail(sending_list->next,
317                                        &lcd->lcd_llcd_list);
318                         llcd = list_entry(lcd->lcd_llcd_list.next,
319                                           typeof(*llcd), llcd_list);
320                         LASSERT(llcd->llcd_lcm == lcm);
321                         import = llcd->llcd_ctxt->loc_imp;
322                         if (import)
323                                 class_import_get(import);
324                 }
325                 list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
326                         LASSERT(llcd->llcd_lcm == lcm);
327                         if (import == llcd->llcd_ctxt->loc_imp)
328                                 list_move_tail(&llcd->llcd_list,
329                                                &lcd->lcd_llcd_list);
330                 }
331                 if (sending_list != &lcm->lcm_llcd_resend) {
332                         list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
333                                                  llcd_list) {
334                                 LASSERT(llcd->llcd_lcm == lcm);
335                                 if (import == llcd->llcd_ctxt->loc_imp)
336                                         list_move_tail(&llcd->llcd_list,
337                                                        &lcd->lcd_llcd_list);
338                         }
339                 }
340                 spin_unlock(&lcm->lcm_llcd_lock);
341
342                 /* We are the only one manipulating our local list - no lock */
343                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
344                         char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
345
346                         list_del(&llcd->llcd_list);
347                         if (llcd->llcd_cookiebytes == 0) {
348                                 CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n",
349                                        llcd, llcd->llcd_ctxt);
350                                 llcd_put(llcd);
351                                 continue;
352                         }
353
354                         mutex_down(&llcd->llcd_ctxt->loc_sem);
355                         if (llcd->llcd_ctxt->loc_imp == NULL) {
356                                 mutex_up(&llcd->llcd_ctxt->loc_sem);
357                                 CWARN("import will be destroyed, put "
358                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
359                                 llcd_put(llcd);
360                                 continue;
361                         }
362                         mutex_up(&llcd->llcd_ctxt->loc_sem);
363
364                         if (!import || (import == LP_POISON) ||
365                             (import->imp_client == LP_POISON)) {
366                                 CERROR("No import %p (llcd=%p, ctxt=%p)\n",
367                                        import, llcd, llcd->llcd_ctxt);
368                                 llcd_put(llcd);
369                                 continue;
370                         }
371
372                         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
373
374                         request = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
375                         if (request == NULL) {
376                                 rc = -ENOMEM;
377                                 stop_log_commit(lcm, lcd, rc);
378                                 break;
379                         }
380
381                         req_capsule_set_size(&request->rq_pill, &RMF_LOGCOOKIES,
382                                              RCL_CLIENT,llcd->llcd_cookiebytes);
383
384                         rc = ptlrpc_request_bufs_pack(request,
385                                                       LUSTRE_LOG_VERSION,
386                                                       OBD_LOG_CANCEL, bufs,
387                                                       NULL);
388                         if (rc) {
389                                 ptlrpc_request_free(request);
390                                 stop_log_commit(lcm, lcd, rc);
391                                 break;
392                         }
393
394                         /* XXX FIXME bug 249, 5515 */
395                         request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
396                         request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
397
398                         ptlrpc_request_set_replen(request);
399                         mutex_down(&llcd->llcd_ctxt->loc_sem);
400                         if (llcd->llcd_ctxt->loc_imp == NULL) {
401                                 mutex_up(&llcd->llcd_ctxt->loc_sem);
402                                 CWARN("import will be destroyed, put "
403                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
404                                 llcd_put(llcd);
405                                 ptlrpc_req_finished(request);
406                                 continue;
407                         }
408                         mutex_up(&llcd->llcd_ctxt->loc_sem);
409                         rc = ptlrpc_queue_wait(request);
410                         ptlrpc_req_finished(request);
411
412                         /* If the RPC failed, we put this and the remaining
413                          * messages onto the resend list for another time. */
414                         if (rc == 0) {
415                                 llcd_put(llcd);
416                                 continue;
417                         }
418
419                         CERROR("commit %p:%p drop %d cookies: rc %d\n",
420                                llcd, llcd->llcd_ctxt,
421                                (int)(llcd->llcd_cookiebytes /
422                                      sizeof(*llcd->llcd_cookies)), rc);
423                         llcd_put(llcd);
424                 }
425
426                 if (rc == 0) {
427                         sending_list = &lcm->lcm_llcd_resend;
428                         if (!list_empty(sending_list))
429                                 goto resend;
430                 }
431         } while(1);
432
433         if (import)
434                 class_import_put(import);
435
436         /* If we are force exiting, just drop all of the cookies. */
437         if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
438                 spin_lock(&lcm->lcm_llcd_lock);
439                 list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
440                 list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
441                 list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
442                 spin_unlock(&lcm->lcm_llcd_lock);
443
444                 list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
445                         llcd_put(llcd);
446         }
447
448         spin_lock(&lcm->lcm_thread_lock);
449         list_del(&lcd->lcd_lcm_list);
450         spin_unlock(&lcm->lcm_thread_lock);
451         OBD_FREE(lcd, sizeof(*lcd));
452
453         CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm());
454
455         spin_lock(&lcm->lcm_thread_lock);
456         atomic_dec(&lcm->lcm_thread_total);
457         spin_unlock(&lcm->lcm_thread_lock);
458         cfs_waitq_signal(&lcm->lcm_waitq);
459
460         return 0;
461 }
462
463 int llog_start_commit_thread(void)
464 {
465         int rc;
466         ENTRY;
467
468         if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
469                 RETURN(0);
470
471         rc = cfs_kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
472         if (rc < 0) {
473                 CERROR("error starting thread #%d: %d\n",
474                        atomic_read(&lcm->lcm_thread_total), rc);
475                 RETURN(rc);
476         }
477
478         RETURN(0);
479 }
480 EXPORT_SYMBOL(llog_start_commit_thread);
481
482 static struct llog_process_args {
483         struct semaphore         llpa_sem;
484         struct llog_ctxt        *llpa_ctxt;
485         void                    *llpa_cb;
486         void                    *llpa_arg;
487 } llpa;
488
489 int llog_init_commit_master(void)
490 {
491         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
492         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
493         spin_lock_init(&lcm->lcm_thread_lock);
494         atomic_set(&lcm->lcm_thread_numidle, 0);
495         cfs_waitq_init(&lcm->lcm_waitq);
496         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
497         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
498         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free);
499         spin_lock_init(&lcm->lcm_llcd_lock);
500         atomic_set(&lcm->lcm_llcd_numfree, 0);
501         lcm->lcm_llcd_minfree = 0;
502         lcm->lcm_thread_max = 5;
503         /* FIXME initialize semaphore for llog_process_args */
504         sema_init(&llpa.llpa_sem, 1);
505         return 0;
506 }
507
508 int llog_cleanup_commit_master(int force)
509 {
510         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
511         if (force)
512                 lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
513         cfs_waitq_signal(&lcm->lcm_waitq);
514
515         wait_event_interruptible(lcm->lcm_waitq,
516                                  atomic_read(&lcm->lcm_thread_total) == 0);
517         return 0;
518 }
519
520 static int log_process_thread(void *args)
521 {
522         struct llog_process_args *data = args;
523         struct llog_ctxt *ctxt = data->llpa_ctxt;
524         void   *cb = data->llpa_cb;
525         struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);
526         struct llog_handle *llh = NULL;
527         int rc;
528         ENTRY;
529
530         mutex_up(&data->llpa_sem);
531         ptlrpc_daemonize("llog_process");     /* thread does IO to log files */
532
533         rc = llog_create(ctxt, &llh, &logid, NULL);
534         if (rc) {
535                 CERROR("llog_create failed %d\n", rc);
536                 RETURN(rc);
537         }
538         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
539         if (rc) {
540                 CERROR("llog_init_handle failed %d\n", rc);
541                 GOTO(out, rc);
542         }
543
544         if (cb) {
545                 rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
546                 if (rc != LLOG_PROC_BREAK)
547                         CERROR("llog_cat_process failed %d\n", rc);
548         } else {
549                 CWARN("no callback function for recovery\n");
550         }
551
552         CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
553                ctxt->loc_llcd, ctxt);
554         llog_sync(ctxt, NULL);
555 out:
556         rc = llog_cat_put(llh);
557         if (rc)
558                 CERROR("llog_cat_put failed %d\n", rc);
559
560         RETURN(rc);
561 }
562
563 static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
564 {
565         int rc;
566         ENTRY;
567
568         mutex_down(&llpa.llpa_sem);
569         llpa.llpa_ctxt = ctxt;
570         llpa.llpa_cb = handle;
571         llpa.llpa_arg = arg;
572
573         rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
574         if (rc < 0)
575                 CERROR("error starting log_process_thread: %d\n", rc);
576         else {
577                 CDEBUG(D_HA, "log_process_thread: %d\n", rc);
578                 rc = 0;
579         }
580
581         RETURN(rc);
582 }
583
584 int llog_repl_connect(struct llog_ctxt *ctxt, int count,
585                       struct llog_logid *logid, struct llog_gen *gen,
586                       struct obd_uuid *uuid)
587 {
588         struct llog_canceld_ctxt *llcd;
589         int rc;
590         ENTRY;
591
592         /* send back llcd before recovery from llog */
593         if (ctxt->loc_llcd != NULL) {
594                 CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);
595                 llog_sync(ctxt, NULL);
596         }
597
598         mutex_down(&ctxt->loc_sem);
599         ctxt->loc_gen = *gen;
600         llcd = llcd_grab();
601         if (llcd == NULL) {
602                 CERROR("couldn't get an llcd\n");
603                 mutex_up(&ctxt->loc_sem);
604                 RETURN(-ENOMEM);
605         }
606         llcd->llcd_ctxt = ctxt;
607         ctxt->loc_llcd = llcd;
608         mutex_up(&ctxt->loc_sem);
609
610         rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
611         if (rc != 0)
612                 CERROR("error recovery process: %d\n", rc);
613
614         RETURN(rc);
615 }
616 EXPORT_SYMBOL(llog_repl_connect);
617
618 #else /* !__KERNEL__ */
619
620 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
621                          struct lov_stripe_md *lsm, int count,
622                          struct llog_cookie *cookies, int flags)
623 {
624         return 0;
625 }
626 #endif