Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/recov_thread.c
37  *
38  * OST<->MDS recovery logging thread.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_LOG
47
48 #ifndef EXPORT_SYMTAB
49 # define EXPORT_SYMTAB
50 #endif
51
52 #ifdef __KERNEL__
53 # include <libcfs/libcfs.h>
54 #else
55 # include <libcfs/list.h>
56 # include <liblustre.h>
57 #endif
58
59 #include <obd_class.h>
60 #include <lustre_commit_confd.h>
61 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_net.h>
64 #include <lnet/types.h>
65 #include <libcfs/list.h>
66 #include <lustre_log.h>
67 #include "ptlrpc_internal.h"
68
69 #ifdef __KERNEL__
70
71 /* Allocate new commit structs in case we do not have enough.
72  * Make the llcd size small enough that it fits into a single page when we
73  * are sending/receiving it. */
74 static int llcd_alloc(struct llog_commit_master *lcm)
75 {
76         struct llog_canceld_ctxt *llcd;
77         int llcd_size;
78
79         /* payload of lustre_msg V2 is bigger */
80         llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
81         OBD_ALLOC(llcd,
82                   llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies));
83         if (llcd == NULL)
84                 return -ENOMEM;
85
86         llcd->llcd_size = llcd_size;
87         llcd->llcd_lcm = lcm;
88
89         spin_lock(&lcm->lcm_llcd_lock);
90         list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
91         atomic_inc(&lcm->lcm_llcd_numfree);
92         spin_unlock(&lcm->lcm_llcd_lock);
93
94         return 0;
95 }
96
97 /* Get a free cookie struct from the list */
98 static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
99 {
100         struct llog_canceld_ctxt *llcd;
101
102 repeat:
103         spin_lock(&lcm->lcm_llcd_lock);
104         if (list_empty(&lcm->lcm_llcd_free)) {
105                 spin_unlock(&lcm->lcm_llcd_lock);
106                 if (llcd_alloc(lcm) < 0) {
107                         CERROR("unable to allocate log commit data!\n");
108                         return NULL;
109                 }
110                 /* check new llcd wasn't grabbed while lock dropped, b=7407 */
111                 goto repeat;
112         }
113
114         llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
115         list_del(&llcd->llcd_list);
116         atomic_dec(&lcm->lcm_llcd_numfree);
117         spin_unlock(&lcm->lcm_llcd_lock);
118
119         llcd->llcd_cookiebytes = 0;
120
121         return llcd;
122 }
123
124 static void llcd_put(struct llog_canceld_ctxt *llcd)
125 {
126         struct llog_commit_master *lcm = llcd->llcd_lcm;
127
128         llog_ctxt_put(llcd->llcd_ctxt);
129         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
130                 int llcd_size = llcd->llcd_size +
131                          offsetof(struct llog_canceld_ctxt, llcd_cookies);
132                 OBD_FREE(llcd, llcd_size);
133         } else {
134                 spin_lock(&lcm->lcm_llcd_lock);
135                 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
136                 atomic_inc(&lcm->lcm_llcd_numfree);
137                 spin_unlock(&lcm->lcm_llcd_lock);
138         }
139 }
140
141 /* Send some cookies to the appropriate target */
142 static void llcd_send(struct llog_canceld_ctxt *llcd)
143 {
144         if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
145                 spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
146                 list_add_tail(&llcd->llcd_list,
147                               &llcd->llcd_lcm->lcm_llcd_pending);
148                 spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
149         }
150         cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
151 }
152
153 /**
154  * Grab llcd and assign it to passed @ctxt. Also set up backward link
155  * and get ref on @ctxt.
156  */
157 static struct llog_canceld_ctxt *ctxt_llcd_grab(struct llog_ctxt *ctxt)
158 {
159         struct llog_canceld_ctxt *llcd;
160
161         LASSERT_SEM_LOCKED(&ctxt->loc_sem);
162         llcd = llcd_grab(ctxt->loc_lcm);
163         if (llcd == NULL)
164                 return NULL;
165
166         llcd->llcd_ctxt = llog_ctxt_get(ctxt);
167         ctxt->loc_llcd = llcd;
168
169         CDEBUG(D_RPCTRACE,"grab llcd %p:%p\n", ctxt->loc_llcd, ctxt);
170         return llcd;
171 }
172
173 /**
174  * Put llcd in passed @ctxt. Set ->loc_llcd to NULL.
175  */
176 static void ctxt_llcd_put(struct llog_ctxt *ctxt)
177 {
178         mutex_down(&ctxt->loc_sem);
179         if (ctxt->loc_llcd != NULL) {
180                 CDEBUG(D_RPCTRACE,"put llcd %p:%p\n", ctxt->loc_llcd, ctxt);
181                 llcd_put(ctxt->loc_llcd);
182                 ctxt->loc_llcd = NULL;
183         }
184         if (ctxt->loc_imp) {
185                 class_import_put(ctxt->loc_imp);
186                 ctxt->loc_imp = NULL;
187         }
188         mutex_up(&ctxt->loc_sem);
189 }
190
191 /* deleted objects have a commit callback that cancels the MDS
192  * log record for the deletion.  The commit callback calls this
193  * function
194  */
195 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
196                          struct lov_stripe_md *lsm, int count,
197                          struct llog_cookie *cookies, int flags)
198 {
199         struct llog_canceld_ctxt *llcd;
200         int rc = 0;
201         ENTRY;
202
203         LASSERT(ctxt);
204
205         mutex_down(&ctxt->loc_sem);
206         llcd = ctxt->loc_llcd;
207
208         if (ctxt->loc_imp == NULL) {
209                 CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);
210                 GOTO(out, rc = 0);
211         }
212
213         if (count > 0 && cookies != NULL) {
214                 if (llcd == NULL) {
215                         llcd = ctxt_llcd_grab(ctxt);
216                         if (llcd == NULL) {
217                                 CERROR("couldn't get an llcd - dropped "LPX64
218                                        ":%x+%u\n",
219                                        cookies->lgc_lgl.lgl_oid,
220                                        cookies->lgc_lgl.lgl_ogen, 
221                                        cookies->lgc_index);
222                                 GOTO(out, rc = -ENOMEM);
223                         }
224                 }
225
226                 memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
227                        cookies, sizeof(*cookies));
228                 llcd->llcd_cookiebytes += sizeof(*cookies);
229         } else {
230                 if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
231                         GOTO(out, rc);
232         }
233
234         if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||
235             (flags & OBD_LLOG_FL_SENDNOW)) {
236                 CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
237                 ctxt->loc_llcd = NULL;
238                 llcd_send(llcd);
239         }
240 out:
241         mutex_up(&ctxt->loc_sem);
242         return rc;
243 }
244 EXPORT_SYMBOL(llog_obd_repl_cancel);
245
246 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
247 {
248         int rc = 0;
249         ENTRY;
250
251         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
252                 CDEBUG(D_RPCTRACE,"reverse import disconnect\n");
253                 /* 
254                  * We put llcd because it is not going to sending list and
255                  * thus, its refc will not be handled. We will handle it here.
256                  */
257                 ctxt_llcd_put(ctxt);
258         } else {
259                 /* 
260                  * Sending cancel. This means that ctxt->loc_llcd wil be
261                  * put on sending list in llog_obd_repl_cancel() and in
262                  * this case recovery thread will take care of it refc.
263                  */
264                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
265         }
266         RETURN(rc);
267 }
268 EXPORT_SYMBOL(llog_obd_repl_sync);
269
270 static inline void stop_log_commit(struct llog_commit_master *lcm,
271                                    struct llog_commit_daemon *lcd,
272                                    int rc)
273 {
274         CERROR("error preparing commit: rc %d\n", rc);
275
276         spin_lock(&lcm->lcm_llcd_lock);
277         list_splice_init(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
278         spin_unlock(&lcm->lcm_llcd_lock);
279 }
280
281 static int log_commit_thread(void *arg)
282 {
283         struct llog_commit_master *lcm = arg;
284         struct llog_commit_daemon *lcd;
285         struct llog_canceld_ctxt *llcd, *n;
286         struct obd_import *import = NULL;
287         ENTRY;
288
289         OBD_ALLOC(lcd, sizeof(*lcd));
290         if (lcd == NULL)
291                 RETURN(-ENOMEM);
292
293         spin_lock(&lcm->lcm_thread_lock);
294         THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1,
295                     "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total));
296         atomic_inc(&lcm->lcm_thread_total);
297         spin_unlock(&lcm->lcm_thread_lock);
298
299         ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */
300
301         CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);
302         CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
303         lcd->lcd_lcm = lcm;
304
305         CDEBUG(D_HA, "%s started\n", cfs_curproc_comm());
306         do {
307                 struct ptlrpc_request *request;
308                 struct list_head *sending_list;
309                 int rc = 0;
310
311                 if (import)
312                         class_import_put(import);
313                 import = NULL;
314
315                 /* If we do not have enough pages available, allocate some */
316                 while (atomic_read(&lcm->lcm_llcd_numfree) <
317                        lcm->lcm_llcd_minfree) {
318                         if (llcd_alloc(lcm) < 0)
319                                 break;
320                 }
321
322                 spin_lock(&lcm->lcm_thread_lock);
323                 atomic_inc(&lcm->lcm_thread_numidle);
324                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
325                 spin_unlock(&lcm->lcm_thread_lock);
326
327                 wait_event_interruptible(lcm->lcm_waitq,
328                                          !list_empty(&lcm->lcm_llcd_pending) ||
329                                          lcm->lcm_flags & LLOG_LCM_FL_EXIT);
330
331                 /* If we are the last available thread, start a new one in case
332                  * we get blocked on an RPC (nobody else will start a new one)*/
333                 spin_lock(&lcm->lcm_thread_lock);
334                 atomic_dec(&lcm->lcm_thread_numidle);
335                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
336                 spin_unlock(&lcm->lcm_thread_lock);
337
338                 sending_list = &lcm->lcm_llcd_pending;
339         resend:
340                 if (import)
341                         class_import_put(import);
342                 import = NULL;
343                 if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
344                         lcm->lcm_llcd_maxfree = 0;
345                         lcm->lcm_llcd_minfree = 0;
346                         lcm->lcm_thread_max = 0;
347
348                         if (list_empty(&lcm->lcm_llcd_pending) ||
349                             lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
350                                 break;
351                 }
352
353                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
354                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
355                         rc = llog_start_commit_thread(lcm);
356                         if (rc < 0)
357                                 CERROR("error starting thread: rc %d\n", rc);
358                 }
359
360                 /* Move all of the pending cancels from the same OST off of
361                  * the list, so we don't get multiple threads blocked and/or
362                  * doing upcalls on the same OST in case of failure. */
363                 spin_lock(&lcm->lcm_llcd_lock);
364                 if (!list_empty(sending_list)) {
365                         list_move_tail(sending_list->next,
366                                        &lcd->lcd_llcd_list);
367                         llcd = list_entry(lcd->lcd_llcd_list.next,
368                                           typeof(*llcd), llcd_list);
369                         LASSERT(llcd->llcd_lcm == lcm);
370                         import = llcd->llcd_ctxt->loc_imp;
371                         if (import)
372                                 class_import_get(import);
373                 }
374                 list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
375                         LASSERT(llcd->llcd_lcm == lcm);
376                         if (import == llcd->llcd_ctxt->loc_imp)
377                                 list_move_tail(&llcd->llcd_list,
378                                                &lcd->lcd_llcd_list);
379                 }
380                 if (sending_list != &lcm->lcm_llcd_resend) {
381                         list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
382                                                  llcd_list) {
383                                 LASSERT(llcd->llcd_lcm == lcm);
384                                 if (import == llcd->llcd_ctxt->loc_imp)
385                                         list_move_tail(&llcd->llcd_list,
386                                                        &lcd->lcd_llcd_list);
387                         }
388                 }
389                 spin_unlock(&lcm->lcm_llcd_lock);
390
391                 /* We are the only one manipulating our local list - no lock */
392                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
393                         char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
394
395                         list_del(&llcd->llcd_list);
396                         if (llcd->llcd_cookiebytes == 0) {
397                                 CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n",
398                                        llcd, llcd->llcd_ctxt);
399                                 llcd_put(llcd);
400                                 continue;
401                         }
402
403                         mutex_down(&llcd->llcd_ctxt->loc_sem);
404                         if (llcd->llcd_ctxt->loc_imp == NULL) {
405                                 mutex_up(&llcd->llcd_ctxt->loc_sem);
406                                 CWARN("import will be destroyed, put "
407                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
408                                 llcd_put(llcd);
409                                 continue;
410                         }
411                         mutex_up(&llcd->llcd_ctxt->loc_sem);
412
413                         if (!import || (import == LP_POISON) ||
414                             (import->imp_client == LP_POISON)) {
415                                 CERROR("No import %p (llcd=%p, ctxt=%p)\n",
416                                        import, llcd, llcd->llcd_ctxt);
417                                 llcd_put(llcd);
418                                 continue;
419                         }
420
421                         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
422
423                         request = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
424                         if (request == NULL) {
425                                 rc = -ENOMEM;
426                                 stop_log_commit(lcm, lcd, rc);
427                                 break;
428                         }
429
430                         req_capsule_set_size(&request->rq_pill, &RMF_LOGCOOKIES,
431                                              RCL_CLIENT,llcd->llcd_cookiebytes);
432
433                         rc = ptlrpc_request_bufs_pack(request,
434                                                       LUSTRE_LOG_VERSION,
435                                                       OBD_LOG_CANCEL, bufs,
436                                                       NULL);
437                         if (rc) {
438                                 ptlrpc_request_free(request);
439                                 stop_log_commit(lcm, lcd, rc);
440                                 break;
441                         }
442
443                         /* bug 5515 */
444                         request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
445                         request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
446                         ptlrpc_at_set_req_timeout(request);
447
448                         ptlrpc_request_set_replen(request);
449                         mutex_down(&llcd->llcd_ctxt->loc_sem);
450                         if (llcd->llcd_ctxt->loc_imp == NULL) {
451                                 mutex_up(&llcd->llcd_ctxt->loc_sem);
452                                 CWARN("import will be destroyed, put "
453                                       "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
454                                 llcd_put(llcd);
455                                 ptlrpc_req_finished(request);
456                                 continue;
457                         }
458                         mutex_up(&llcd->llcd_ctxt->loc_sem);
459                         rc = ptlrpc_queue_wait(request);
460                         ptlrpc_req_finished(request);
461
462                         /* If the RPC failed, we put this and the remaining
463                          * messages onto the resend list for another time. */
464                         if (rc == 0) {
465                                 llcd_put(llcd);
466                                 continue;
467                         }
468
469                         CERROR("commit %p:%p drop %d cookies: rc %d\n",
470                                llcd, llcd->llcd_ctxt,
471                                (int)(llcd->llcd_cookiebytes /
472                                      sizeof(*llcd->llcd_cookies)), rc);
473                         llcd_put(llcd);
474                 }
475
476                 if (rc == 0) {
477                         sending_list = &lcm->lcm_llcd_resend;
478                         if (!list_empty(sending_list))
479                                 goto resend;
480                 }
481         } while(1);
482
483         if (import)
484                 class_import_put(import);
485
486         /* If we are force exiting, just drop all of the cookies. */
487         if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
488                 spin_lock(&lcm->lcm_llcd_lock);
489                 list_splice_init(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
490                 list_splice_init(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
491                 list_splice_init(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
492                 spin_unlock(&lcm->lcm_llcd_lock);
493
494                 list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
495                         llcd_put(llcd);
496         }
497
498         spin_lock(&lcm->lcm_thread_lock);
499         list_del(&lcd->lcd_lcm_list);
500         spin_unlock(&lcm->lcm_thread_lock);
501         OBD_FREE(lcd, sizeof(*lcd));
502
503         CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm());
504
505         spin_lock(&lcm->lcm_thread_lock);
506         atomic_dec(&lcm->lcm_thread_total);
507         spin_unlock(&lcm->lcm_thread_lock);
508         cfs_waitq_signal(&lcm->lcm_waitq);
509
510         return 0;
511 }
512
513 int llog_start_commit_thread(struct llog_commit_master *lcm)
514 {
515         int rc;
516         ENTRY;
517
518         if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
519                 RETURN(0);
520
521         rc = cfs_kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
522         if (rc < 0) {
523                 CERROR("error starting thread #%d: %d\n",
524                        atomic_read(&lcm->lcm_thread_total), rc);
525                 RETURN(rc);
526         }
527
528         RETURN(0);
529 }
530 EXPORT_SYMBOL(llog_start_commit_thread);
531
532 static struct llog_process_args {
533         struct semaphore         llpa_sem;
534         struct llog_ctxt        *llpa_ctxt;
535         void                    *llpa_cb;
536         void                    *llpa_arg;
537 } llpa;
538
539 int llog_init_commit_master(struct llog_commit_master *lcm)
540 {
541         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
542         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
543         spin_lock_init(&lcm->lcm_thread_lock);
544         atomic_set(&lcm->lcm_thread_numidle, 0);
545         cfs_waitq_init(&lcm->lcm_waitq);
546         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
547         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
548         CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free);
549         spin_lock_init(&lcm->lcm_llcd_lock);
550         atomic_set(&lcm->lcm_llcd_numfree, 0);
551         lcm->lcm_llcd_minfree = 0;
552         lcm->lcm_thread_max = 5;
553         /* FIXME initialize semaphore for llog_process_args */
554         sema_init(&llpa.llpa_sem, 1);
555         return 0;
556 }
557 EXPORT_SYMBOL(llog_init_commit_master);
558
559 int llog_cleanup_commit_master(struct llog_commit_master *lcm,
560                                int force)
561 {
562         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
563         if (force)
564                 lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
565         cfs_waitq_signal(&lcm->lcm_waitq);
566
567         wait_event_interruptible(lcm->lcm_waitq,
568                                  atomic_read(&lcm->lcm_thread_total) == 0);
569         return 0;
570 }
571 EXPORT_SYMBOL(llog_cleanup_commit_master);
572
573 static int log_process_thread(void *args)
574 {
575         struct llog_process_args *data = args;
576         struct llog_ctxt *ctxt = data->llpa_ctxt;
577         void   *cb = data->llpa_cb;
578         struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);
579         struct llog_handle *llh = NULL;
580         int rc;
581         ENTRY;
582
583         mutex_up(&data->llpa_sem);
584         ptlrpc_daemonize("llog_process");     /* thread does IO to log files */
585
586         rc = llog_create(ctxt, &llh, &logid, NULL);
587         if (rc) {
588                 CERROR("llog_create failed %d\n", rc);
589                 GOTO(out, rc);
590         }
591         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
592         if (rc) {
593                 CERROR("llog_init_handle failed %d\n", rc);
594                 GOTO(release_llh, rc);
595         }
596
597         if (cb) {
598                 rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
599                 if (rc != LLOG_PROC_BREAK)
600                         CERROR("llog_cat_process failed %d\n", rc);
601         } else {
602                 CWARN("no callback function for recovery\n");
603         }
604
605         CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
606                ctxt->loc_llcd, ctxt);
607         llog_sync(ctxt, NULL);
608
609 release_llh:
610         rc = llog_cat_put(llh);
611         if (rc)
612                 CERROR("llog_cat_put failed %d\n", rc);
613 out:
614         llog_ctxt_put(ctxt);
615         RETURN(rc);
616 }
617
618 static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
619 {
620         struct obd_device *obd = ctxt->loc_obd;
621         int rc;
622         ENTRY;
623
624         if (obd->obd_stopping)
625                 RETURN(-ENODEV);
626
627         mutex_down(&llpa.llpa_sem);
628         llpa.llpa_cb = handle;
629         llpa.llpa_arg = arg;
630         llpa.llpa_ctxt = llog_ctxt_get(ctxt);
631         if (!llpa.llpa_ctxt) {
632                 up(&llpa.llpa_sem);
633                 RETURN(-ENODEV);
634         }
635         rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
636         if (rc < 0) {
637                 llog_ctxt_put(ctxt);
638                 CERROR("error starting log_process_thread: %d\n", rc);
639         } else {
640                 CDEBUG(D_HA, "log_process_thread: %d\n", rc);
641                 rc = 0;
642         }
643
644         RETURN(rc);
645 }
646
647 int llog_repl_connect(struct llog_ctxt *ctxt, int count,
648                       struct llog_logid *logid, struct llog_gen *gen,
649                       struct obd_uuid *uuid)
650 {
651         struct llog_canceld_ctxt *llcd;
652         int rc;
653         ENTRY;
654
655         /* send back llcd before recovery from llog */
656         if (ctxt->loc_llcd != NULL) {
657                 CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);
658                 llog_sync(ctxt, NULL);
659         }
660
661         mutex_down(&ctxt->loc_sem);
662         ctxt->loc_gen = *gen;
663         llcd = ctxt_llcd_grab(ctxt);
664         if (llcd == NULL) {
665                 CERROR("couldn't get an llcd\n");
666                 mutex_up(&ctxt->loc_sem);
667                 RETURN(-ENOMEM);
668         }
669         mutex_up(&ctxt->loc_sem);
670
671         rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
672         if (rc != 0) {
673                 ctxt_llcd_put(ctxt);
674                 CERROR("error recovery process: %d\n", rc);
675         }
676         RETURN(rc);
677 }
678 EXPORT_SYMBOL(llog_repl_connect);
679
680 #else /* !__KERNEL__ */
681
682 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
683                          struct lov_stripe_md *lsm, int count,
684                          struct llog_cookie *cookies, int flags)
685 {
686         return 0;
687 }
688 #endif