Whamcloud - gitweb
Branch:b1_6
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * OST<->MDS recovery logging thread.
26  *
27  * Invariants in implementation:
28  * - we do not share logs among different OST<->MDS connections, so that
29  *   if an OST or MDS fails it need only look at log(s) relevant to itself
30  */
31
32 #define DEBUG_SUBSYSTEM S_LOG
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37
38 #ifdef __KERNEL__
39 # include <libcfs/libcfs.h>
40 #else
41 # include <libcfs/list.h>
42 # include <liblustre.h>
43 #endif
44
45 #include <libcfs/kp30.h>
46 #include <obd_class.h>
47 #include <lustre_commit_confd.h>
48 #include <obd_support.h>
49 #include <obd_class.h>
50 #include <lustre_net.h>
51 #include <lnet/types.h>
52 #include <libcfs/list.h>
53 #include <lustre_log.h>
54 #include "ptlrpc_internal.h"
55
56 #ifdef __KERNEL__
57
58 /* Allocate new commit structs in case we do not have enough.
59  * Make the llcd size small enough that it fits into a single page when we
60  * are sending/receiving it. */
61 static int llcd_alloc(struct llog_commit_master *lcm)
62 {
63         struct llog_canceld_ctxt *llcd;
64         int llcd_size;
65
66         /* payload of lustre_msg V2 is bigger */
67         llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
68         OBD_ALLOC(llcd,
69                   llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies));
70         if (llcd == NULL)
71                 return -ENOMEM;
72
73         llcd->llcd_size = llcd_size;
74         llcd->llcd_lcm = lcm;
75
76         spin_lock(&lcm->lcm_llcd_lock);
77         list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
78         atomic_inc(&lcm->lcm_llcd_numfree);
79         spin_unlock(&lcm->lcm_llcd_lock);
80
81         return 0;
82 }
83
84 /* Get a free cookie struct from the list */
85 static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
86 {
87         struct llog_canceld_ctxt *llcd;
88
89 repeat:
90         spin_lock(&lcm->lcm_llcd_lock);
91         if (list_empty(&lcm->lcm_llcd_free)) {
92                 spin_unlock(&lcm->lcm_llcd_lock);
93                 if (llcd_alloc(lcm) < 0) {
94                         CERROR("unable to allocate log commit data!\n");
95                         return NULL;
96                 }
97                 /* check new llcd wasn't grabbed while lock dropped, b=7407 */
98                 goto repeat;
99         }
100
101         llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
102         list_del(&llcd->llcd_list);
103         atomic_dec(&lcm->lcm_llcd_numfree);
104         spin_unlock(&lcm->lcm_llcd_lock);
105
106         llcd->llcd_cookiebytes = 0;
107
108         return llcd;
109 }
110
111 static void llcd_put(struct llog_canceld_ctxt *llcd)
112 {
113         struct llog_commit_master *lcm = llcd->llcd_lcm;
114
115         llog_ctxt_put(llcd->llcd_ctxt);
116         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
117                 int llcd_size = llcd->llcd_size +
118                          offsetof(struct llog_canceld_ctxt, llcd_cookies);
119                 OBD_FREE(llcd, llcd_size);
120         } else {
121                 spin_lock(&lcm->lcm_llcd_lock);
122                 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
123                 atomic_inc(&lcm->lcm_llcd_numfree);
124                 spin_unlock(&lcm->lcm_llcd_lock);
125         }
126 }
127
128 /* Send some cookies to the appropriate target */
129 static void llcd_send(struct llog_canceld_ctxt *llcd)
130 {
131         if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
132                 spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
133                 list_add_tail(&llcd->llcd_list,
134                               &llcd->llcd_lcm->lcm_llcd_pending);
135                 spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
136         }
137         cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
138 }
139
140 /* deleted objects have a commit callback that cancels the MDS
141  * log record for the deletion.  The commit callback calls this
142  * function
143  */
144 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
145                          struct lov_stripe_md *lsm, int count,
146                          struct llog_cookie *cookies, int flags)
147 {
148         struct llog_canceld_ctxt *llcd;
149         int rc = 0;
150         ENTRY;
151
152         LASSERT(ctxt);
153
154         mutex_down(&ctxt->loc_sem);
155         if (ctxt->loc_imp == NULL) {
156                 CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);
157                 GOTO(out, rc = 0);
158         }
159
160         llcd = ctxt->loc_llcd;
161
162         if (count > 0 && cookies != NULL) {
163                 if (llcd == NULL) {
164                         llcd = llcd_grab(ctxt->loc_lcm);
165                         if (llcd == NULL) {
166                                 CERROR("couldn't get an llcd - dropped "LPX64
167                                        ":%x+%u\n",
168                                        cookies->lgc_lgl.lgl_oid,
169                                        cookies->lgc_lgl.lgl_ogen,
170                                        cookies->lgc_index);
171                                 GOTO(out, rc = -ENOMEM);
172                         }
173                         llcd->llcd_ctxt = llog_ctxt_get(ctxt);
174                         ctxt->loc_llcd = llcd;
175                 }
176
177                 memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes,
178                        cookies, sizeof(*cookies));
179                 llcd->llcd_cookiebytes += sizeof(*cookies);
180         } else {
181                 if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
182                         GOTO(out, rc);
183         }
184
185         if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||
186             (flags & OBD_LLOG_FL_SENDNOW)) {
187                 CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
188                 ctxt->loc_llcd = NULL;
189                 llcd_send(llcd);
190         }
191 out:
192         mutex_up(&ctxt->loc_sem);
193         return rc;
194 }
195 EXPORT_SYMBOL(llog_obd_repl_cancel);
196
197 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
198 {
199         int rc = 0;
200         ENTRY;
201
202         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
203                 CDEBUG(D_RPCTRACE,"reverse import disconnect, put llcd %p:%p\n",
204                        ctxt->loc_llcd, ctxt);
205                 mutex_down(&ctxt->loc_sem);
206                 if (ctxt->loc_llcd != NULL) {
207                         llcd_put(ctxt->loc_llcd);
208                         ctxt->loc_llcd = NULL;
209                 }
210                 ctxt->loc_imp = NULL;
211                 mutex_up(&ctxt->loc_sem);
212         } else {
213                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
214         }
215
216         RETURN(rc);
217 }
218 EXPORT_SYMBOL(llog_obd_repl_sync);
219
220 static void llog_lcm_dec(struct llog_commit_master *lcm)
221 {
222         atomic_dec(&lcm->lcm_thread_total);
223         cfs_waitq_signal(&lcm->lcm_waitq);
224 }
225
226 static int log_commit_thread(void *arg)
227 {
228         struct llog_commit_daemon *lcd = arg;
229         struct llog_commit_master *lcm = lcd->lcd_lcm;
230         struct llog_canceld_ctxt *llcd, *n;
231         struct obd_import *import = NULL;
232         ENTRY;
233
234         THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1,
235                     "ll_log_comt_%02d", lcd->lcd_index);
236         
237         ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */
238         CDEBUG(D_HA, "%s started\n", cfs_curproc_comm());
239         
240         do {
241                 struct ptlrpc_request *request;
242                 struct list_head *sending_list;
243                 int rc = 0;
244
245                 if (import)
246                         class_import_put(import);
247                 import = NULL;
248
249                 /* If we do not have enough pages available, allocate some */
250                 while (atomic_read(&lcm->lcm_llcd_numfree) <
251                        lcm->lcm_llcd_minfree) {
252                         if (llcd_alloc(lcm) < 0)
253                                 break;
254                 }
255
256                 spin_lock(&lcm->lcm_thread_lock);
257                 atomic_inc(&lcm->lcm_thread_numidle);
258                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
259                 spin_unlock(&lcm->lcm_thread_lock);
260
261                 wait_event_interruptible(lcm->lcm_waitq,
262                                          !list_empty(&lcm->lcm_llcd_pending) ||
263                                          lcm->lcm_flags & LLOG_LCM_FL_EXIT);
264
265                 /* If we are the last available thread, start a new one in case
266                  * we get blocked on an RPC (nobody else will start a new one)*/
267                 spin_lock(&lcm->lcm_thread_lock);
268                 atomic_dec(&lcm->lcm_thread_numidle);
269                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
270                 spin_unlock(&lcm->lcm_thread_lock);
271
272                 sending_list = &lcm->lcm_llcd_pending;
273         resend:
274                 if (import)
275                         class_import_put(import);
276                 import = NULL;
277                 if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
278                         lcm->lcm_llcd_maxfree = 0;
279                         lcm->lcm_llcd_minfree = 0;
280                         lcm->lcm_thread_max = 0;
281
282                         if (list_empty(&lcm->lcm_llcd_pending) ||
283                             lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
284                                 break;
285                 }
286
287                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
288                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
289                         rc = llog_start_commit_thread(lcm);
290                         if (rc < 0)
291                                 CERROR("error starting thread: rc %d\n", rc);
292                 }
293
294                 /* Move all of the pending cancels from the same OST off of
295                  * the list, so we don't get multiple threads blocked and/or
296                  * doing upcalls on the same OST in case of failure. */
297                 spin_lock(&lcm->lcm_llcd_lock);
298                 if (!list_empty(sending_list)) {
299                         list_move_tail(sending_list->next,
300                                        &lcd->lcd_llcd_list);
301                         llcd = list_entry(lcd->lcd_llcd_list.next,
302                                           typeof(*llcd), llcd_list);
303                         LASSERT(llcd->llcd_lcm == lcm);
304                         import = llcd->llcd_ctxt->loc_imp;
305                         if (import)
306                                 class_import_get(import);
307                 }
308                 list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
309                         LASSERT(llcd->llcd_lcm == lcm);
310                         if (import == llcd->llcd_ctxt->loc_imp)
311                                 list_move_tail(&llcd->llcd_list,
312                                                &lcd->lcd_llcd_list);
313                 }
314                 if (sending_list != &lcm->lcm_llcd_resend) {
315                         list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
316                                                  llcd_list) {
317                                 LASSERT(llcd->llcd_lcm == lcm);
318                                 if (import == llcd->llcd_ctxt->loc_imp)
319                                         list_move_tail(&llcd->llcd_list,
320                                                        &lcd->lcd_llcd_list);
321                         }
322                 }
323                 spin_unlock(&lcm->lcm_llcd_lock);
324
325                 /* We are the only one manipulating our local list - no lock */
326                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
327                         int size[2] = { sizeof(struct ptlrpc_body),
328                                         llcd->llcd_cookiebytes };
329                         char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
330
331                         list_del(&llcd->llcd_list);
332                         if (llcd->llcd_cookiebytes == 0) {
333                                 CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n",
334                                        llcd, llcd->llcd_ctxt);
335                                 llcd_put(llcd);
336                                 continue;
337                         }
338
339                         mutex_down(&llcd->llcd_ctxt->loc_sem);
340                         if (llcd->llcd_ctxt->loc_imp == NULL) {
341                                 mutex_up(&llcd->llcd_ctxt->loc_sem);
342                                 CWARN("import will be destroyed, put "
343                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
344                                 llcd_put(llcd);
345                                 continue;
346                         }
347                         mutex_up(&llcd->llcd_ctxt->loc_sem);
348
349                         if (!import || (import == LP_POISON) ||
350                             (import->imp_client == LP_POISON)) {
351                                 CERROR("No import %p (llcd=%p, ctxt=%p)\n",
352                                        import, llcd, llcd->llcd_ctxt);
353                                 llcd_put(llcd);
354                                 continue;
355                         }
356
357                         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
358
359                         request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION,
360                                                   OBD_LOG_CANCEL, 2, size,bufs);
361                         if (request == NULL) {
362                                 rc = -ENOMEM;
363                                 CERROR("error preparing commit: rc %d\n", rc);
364
365                                 spin_lock(&lcm->lcm_llcd_lock);
366                                 list_splice(&lcd->lcd_llcd_list,
367                                             &lcm->lcm_llcd_resend);
368                                 CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
369                                 spin_unlock(&lcm->lcm_llcd_lock);
370                                 break;
371                         }
372
373                         /* XXX FIXME bug 249, 5515 */
374                         request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
375                         request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
376
377                         ptlrpc_req_set_repsize(request, 1, NULL);
378                         mutex_down(&llcd->llcd_ctxt->loc_sem);
379                         if (llcd->llcd_ctxt->loc_imp == NULL) {
380                                 mutex_up(&llcd->llcd_ctxt->loc_sem);
381                                 CWARN("import will be destroyed, put "
382                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
383                                 llcd_put(llcd);
384                                 ptlrpc_req_finished(request);
385                                 continue;
386                         }
387                         mutex_up(&llcd->llcd_ctxt->loc_sem);
388                         rc = ptlrpc_queue_wait(request);
389                         ptlrpc_req_finished(request);
390
391                         /* If the RPC failed, we put this and the remaining
392                          * messages onto the resend list for another time. */
393                         if (rc == 0) {
394                                 llcd_put(llcd);
395                                 continue;
396                         }
397
398                         CERROR("commit %p:%p drop %d cookies: rc %d\n",
399                                llcd, llcd->llcd_ctxt,
400                                (int)(llcd->llcd_cookiebytes /
401                                      sizeof(*llcd->llcd_cookies)), rc);
402                         llcd_put(llcd);
403                 }
404
405                 if (rc == 0) {
406                         sending_list = &lcm->lcm_llcd_resend;
407                         if (!list_empty(sending_list))
408                                 goto resend;
409                 }
410         } while(1);
411
412         if (import)
413                 class_import_put(import);
414
415         /* If we are force exiting, just drop all of the cookies. */
416         if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
417                 spin_lock(&lcm->lcm_llcd_lock);
418                 list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
419                 list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
420                 list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
421                 spin_unlock(&lcm->lcm_llcd_lock);
422
423                 list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
424                         llcd_put(llcd);
425         }
426
427
428         CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm());
429
430         spin_lock(&lcm->lcm_thread_lock);
431         list_del(&lcd->lcd_lcm_list);
432         spin_unlock(&lcm->lcm_thread_lock);
433         OBD_FREE_PTR(lcd);
434         llog_lcm_dec(lcm);
435
436         RETURN(0);
437 }
438
439 int llog_start_commit_thread(struct llog_commit_master *lcm)
440 {
441         struct llog_commit_daemon *lcd;
442         int rc, index; 
443         ENTRY;
444
445         if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
446                 RETURN(0);
447
448         /* Check whether it will be cleanup llog commit thread first,
449          * If not, increate the lcm_thread_total count to prevent the 
450          * lcm being freed when the log_commit_thread is started */
451         spin_lock(&lcm->lcm_thread_lock);
452         if (!lcm->lcm_flags & LLOG_LCM_FL_EXIT) { 
453                 atomic_inc(&lcm->lcm_thread_total);
454                 index = atomic_read(&lcm->lcm_thread_total);
455                 spin_unlock(&lcm->lcm_thread_lock);
456         } else {
457                 spin_unlock(&lcm->lcm_thread_lock);
458                 RETURN(0);
459         }
460
461         OBD_ALLOC_PTR(lcd);
462         if (lcd == NULL)
463                 GOTO(cleanup, rc = -ENOMEM);
464
465         CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);
466         CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
467         lcd->lcd_index = index;
468         lcd->lcd_lcm = lcm;
469
470         rc = cfs_kernel_thread(log_commit_thread, lcd, CLONE_VM | CLONE_FILES);
471 cleanup:
472         if (rc < 0) {
473                 CERROR("error starting thread #%d: %d\n", lcd->lcd_index, rc);
474                 llog_lcm_dec(lcm);
475                 if (lcd) 
476                         OBD_FREE_PTR(lcd);
477                 RETURN(rc);
478         }
479         RETURN(0);
480 }
481 EXPORT_SYMBOL(llog_start_commit_thread);
482
483 static struct llog_process_args {
484         struct semaphore         llpa_sem;
485         struct llog_ctxt        *llpa_ctxt;
486         void                    *llpa_cb;
487         void                    *llpa_arg;
488 } llpa;
489
490 int llog_init_commit_master(struct llog_commit_master *lcm)
491 {
492         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
493         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
494         spin_lock_init(&lcm->lcm_thread_lock);
495         atomic_set(&lcm->lcm_thread_numidle, 0);
496         cfs_waitq_init(&lcm->lcm_waitq);
497         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
498         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
499         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free);
500         spin_lock_init(&lcm->lcm_llcd_lock);
501         atomic_set(&lcm->lcm_llcd_numfree, 0);
502         lcm->lcm_llcd_minfree = 0;
503         lcm->lcm_thread_max = 5;
504         /* FIXME initialize semaphore for llog_process_args */
505         sema_init(&llpa.llpa_sem, 1);
506         return 0;
507 }
508 EXPORT_SYMBOL(llog_init_commit_master);
509
510 int llog_cleanup_commit_master(struct llog_commit_master *lcm,
511                                int force)
512 {
513         spin_lock(&lcm->lcm_thread_lock);
514         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
515         if (force)
516                 lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
517         
518         spin_unlock(&lcm->lcm_thread_lock);
519         
520         cfs_waitq_signal(&lcm->lcm_waitq);
521
522         wait_event_interruptible(lcm->lcm_waitq,
523                                  atomic_read(&lcm->lcm_thread_total) == 0);
524         return 0;
525 }
526 EXPORT_SYMBOL(llog_cleanup_commit_master);
527
528 static int log_process_thread(void *args)
529 {
530         struct llog_process_args *data = args;
531         struct llog_ctxt *ctxt = data->llpa_ctxt;
532         void   *cb = data->llpa_cb;
533         struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);
534         struct llog_handle *llh = NULL;
535         int rc;
536         ENTRY;
537
538         mutex_up(&data->llpa_sem);
539         ptlrpc_daemonize("llog_process");     /* thread does IO to log files */
540
541         rc = llog_create(ctxt, &llh, &logid, NULL);
542         if (rc) {
543                 CERROR("llog_create failed %d\n", rc);
544                 GOTO(out, rc);
545         }
546         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
547         if (rc) {
548                 CERROR("llog_init_handle failed %d\n", rc);
549                 GOTO(release_llh, rc);
550         }
551
552         if (cb) {
553                 rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
554                 if (rc != LLOG_PROC_BREAK)
555                         CERROR("llog_cat_process failed %d\n", rc);
556         } else {
557                 CWARN("no callback function for recovery\n");
558         }
559
560         CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
561                ctxt->loc_llcd, ctxt);
562         llog_sync(ctxt, NULL);
563
564 release_llh:
565         rc = llog_cat_put(llh);
566         if (rc)
567                 CERROR("llog_cat_put failed %d\n", rc);
568 out:
569         llog_ctxt_put(ctxt);
570         RETURN(rc);
571 }
572
573 static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
574 {
575         struct obd_device *obd = ctxt->loc_obd;
576         int rc;
577         ENTRY;
578
579         if (obd->obd_stopping)
580                 RETURN(-ENODEV);
581
582         mutex_down(&llpa.llpa_sem);
583         llpa.llpa_cb = handle;
584         llpa.llpa_arg = arg;
585         llpa.llpa_ctxt = llog_get_context(ctxt->loc_obd, ctxt->loc_idx);
586         if (!llpa.llpa_ctxt) {
587                 up(&llpa.llpa_sem);
588                 RETURN(-ENODEV);
589         }
590         rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
591         if (rc < 0)
592                 CERROR("error starting log_process_thread: %d\n", rc);
593         else {
594                 CDEBUG(D_HA, "log_process_thread: %d\n", rc);
595                 rc = 0;
596         }
597
598         RETURN(rc);
599 }
600
601 int llog_repl_connect(struct llog_ctxt *ctxt, int count,
602                       struct llog_logid *logid, struct llog_gen *gen,
603                       struct obd_uuid *uuid)
604 {
605         struct llog_canceld_ctxt *llcd;
606         int rc;
607         ENTRY;
608
609         /* send back llcd before recovery from llog */
610         if (ctxt->loc_llcd != NULL) {
611                 CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);
612                 llog_sync(ctxt, NULL);
613         }
614
615         mutex_down(&ctxt->loc_sem);
616         ctxt->loc_gen = *gen;
617         llcd = llcd_grab(ctxt->loc_lcm);
618         if (llcd == NULL) {
619                 CERROR("couldn't get an llcd\n");
620                 mutex_up(&ctxt->loc_sem);
621                 RETURN(-ENOMEM);
622         }
623         llcd->llcd_ctxt = llog_ctxt_get(ctxt);
624         ctxt->loc_llcd = llcd;
625         mutex_up(&ctxt->loc_sem);
626
627         rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
628         if (rc != 0)
629                 CERROR("error recovery process: %d\n", rc);
630
631         RETURN(rc);
632 }
633 EXPORT_SYMBOL(llog_repl_connect);
634
635 #else /* !__KERNEL__ */
636
637 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
638                          struct lov_stripe_md *lsm, int count,
639                          struct llog_cookie *cookies, int flags)
640 {
641         return 0;
642 }
643 #endif