Whamcloud - gitweb
ORNL-22 general ptlrpcd threads pool support
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/ptlrpc/recov_thread.c
40  *
41  * OST<->MDS recovery logging thread.
42  * Invariants in implementation:
43  * - we do not share logs among different OST<->MDS connections, so that
44  *   if an OST or MDS fails it need only look at log(s) relevant to itself
45  *
46  * Author: Andreas Dilger   <adilger@clusterfs.com>
47  *         Yury Umanets     <yury.umanets@sun.com>
48  *         Alexey Lyashkov  <alexey.lyashkov@sun.com>
49  */
50
51 #define DEBUG_SUBSYSTEM S_LOG
52
53 #ifndef EXPORT_SYMTAB
54 # define EXPORT_SYMTAB
55 #endif
56
57 #ifdef __KERNEL__
58 # include <libcfs/libcfs.h>
59 #else
60 # include <libcfs/list.h>
61 # include <liblustre.h>
62 #endif
63
64 #include <obd_class.h>
65 #include <obd_support.h>
66 #include <obd_class.h>
67 #include <lustre_net.h>
68 #include <lnet/types.h>
69 #include <libcfs/list.h>
70 #include <lustre_log.h>
71 #include "ptlrpc_internal.h"
72
73 static cfs_atomic_t               llcd_count = CFS_ATOMIC_INIT(0);
74 static cfs_mem_cache_t           *llcd_cache = NULL;
75
76 #ifdef __KERNEL__
77 enum {
78         LLOG_LCM_FL_START       = 1 << 0,
79         LLOG_LCM_FL_EXIT        = 1 << 1
80 };
81
82 struct llcd_async_args {
83         struct llog_canceld_ctxt *la_ctxt;
84 };
85
86 static void llcd_print(struct llog_canceld_ctxt *llcd,
87                        const char *func, int line)
88 {
89         CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line);
90         CDEBUG(D_RPCTRACE, "  size: %d\n", llcd->llcd_size);
91         CDEBUG(D_RPCTRACE, "  ctxt: %p\n", llcd->llcd_ctxt);
92         CDEBUG(D_RPCTRACE, "  lcm : %p\n", llcd->llcd_lcm);
93         CDEBUG(D_RPCTRACE, "  cookiebytes : %d\n", llcd->llcd_cookiebytes);
94 }
95
96 /**
97  * Allocate new llcd from cache, init it and return to caller.
98  * Bumps number of objects allocated.
99  */
100 static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm)
101 {
102         struct llog_canceld_ctxt *llcd;
103         int size, overhead;
104
105         LASSERT(lcm != NULL);
106
107         /*
108          * We want to send one page of cookies with rpc header. This buffer
109          * will be assigned later to the rpc, this is why we preserve the
110          * space for rpc header.
111          */
112         size = CFS_PAGE_SIZE - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
113         overhead =  offsetof(struct llog_canceld_ctxt, llcd_cookies);
114         OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, size + overhead);
115         if (!llcd)
116                 return NULL;
117
118         CFS_INIT_LIST_HEAD(&llcd->llcd_list);
119         llcd->llcd_cookiebytes = 0;
120         llcd->llcd_size = size;
121
122         cfs_spin_lock(&lcm->lcm_lock);
123         llcd->llcd_lcm = lcm;
124         cfs_atomic_inc(&lcm->lcm_count);
125         cfs_list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds);
126         cfs_spin_unlock(&lcm->lcm_lock);
127         cfs_atomic_inc(&llcd_count);
128
129         CDEBUG(D_RPCTRACE, "Alloc llcd %p on lcm %p (%d)\n",
130                llcd, lcm, cfs_atomic_read(&lcm->lcm_count));
131
132         return llcd;
133 }
134
135 /**
136  * Returns passed llcd to cache.
137  */
138 static void llcd_free(struct llog_canceld_ctxt *llcd)
139 {
140         struct llog_commit_master *lcm = llcd->llcd_lcm;
141         int size;
142
143         if (lcm) {
144                 if (cfs_atomic_read(&lcm->lcm_count) == 0) {
145                         CERROR("Invalid llcd free %p\n", llcd);
146                         llcd_print(llcd, __FUNCTION__, __LINE__);
147                         LBUG();
148                 }
149                 cfs_spin_lock(&lcm->lcm_lock);
150                 LASSERT(!cfs_list_empty(&llcd->llcd_list));
151                 cfs_list_del_init(&llcd->llcd_list);
152                 cfs_atomic_dec(&lcm->lcm_count);
153                 cfs_spin_unlock(&lcm->lcm_lock);
154
155                 CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n",
156                        llcd, lcm, cfs_atomic_read(&lcm->lcm_count));
157         }
158
159         LASSERT(cfs_atomic_read(&llcd_count) > 0);
160         cfs_atomic_dec(&llcd_count);
161
162         size = offsetof(struct llog_canceld_ctxt, llcd_cookies) +
163             llcd->llcd_size;
164         OBD_SLAB_FREE(llcd, llcd_cache, size);
165 }
166
167 /**
168  * Checks if passed cookie fits into llcd free space buffer. Returns
169  * 1 if yes and 0 otherwise.
170  */
171 static inline int
172 llcd_fit(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
173 {
174         return (llcd->llcd_size - llcd->llcd_cookiebytes >= sizeof(*cookies));
175 }
176
177 /**
178  * Copy passed @cookies to @llcd.
179  */
180 static inline void
181 llcd_copy(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
182 {
183         LASSERT(llcd_fit(llcd, cookies));
184         memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes,
185               cookies, sizeof(*cookies));
186         llcd->llcd_cookiebytes += sizeof(*cookies);
187 }
188
189 /**
190  * Llcd completion function. Called uppon llcd send finish regardless
191  * sending result. Error is passed in @rc. Note, that this will be called
192  * in cleanup time when all inflight rpcs aborted.
193  */
194 static int
195 llcd_interpret(const struct lu_env *env,
196                struct ptlrpc_request *req, void *args, int rc)
197 {
198         struct llcd_async_args *la = args;
199         struct llog_canceld_ctxt *llcd = la->la_ctxt;
200
201         CDEBUG(D_RPCTRACE, "Sent llcd %p (%d) - killing it\n", llcd, rc);
202         llcd_free(llcd);
203         return 0;
204 }
205
206 /**
207  * Send @llcd to remote node. Free llcd uppon completion or error. Sending
208  * is performed in async style so this function will return asap without
209  * blocking.
210  */
211 static int llcd_send(struct llog_canceld_ctxt *llcd)
212 {
213         char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
214         struct obd_import *import = NULL;
215         struct llog_commit_master *lcm;
216         struct llcd_async_args *la;
217         struct ptlrpc_request *req;
218         struct llog_ctxt *ctxt;
219         int rc;
220         ENTRY;
221
222         ctxt = llcd->llcd_ctxt;
223         if (!ctxt) {
224                 CERROR("Invalid llcd with NULL ctxt found (%p)\n",
225                        llcd);
226                 llcd_print(llcd, __FUNCTION__, __LINE__);
227                 LBUG();
228         }
229         LASSERT_SEM_LOCKED(&ctxt->loc_sem);
230
231         if (llcd->llcd_cookiebytes == 0)
232                 GOTO(exit, rc = 0);
233
234         lcm = llcd->llcd_lcm;
235
236         /*
237          * Check if we're in exit stage. Do not send llcd in
238          * this case.
239          */
240         if (cfs_test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
241                 GOTO(exit, rc = -ENODEV);
242
243         CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd);
244
245         import = llcd->llcd_ctxt->loc_imp;
246         if (!import || (import == LP_POISON) ||
247             (import->imp_client == LP_POISON)) {
248                 CERROR("Invalid import %p for llcd %p\n",
249                        import, llcd);
250                 GOTO(exit, rc = -ENODEV);
251         }
252
253         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
254
255         /*
256          * No need to get import here as it is already done in
257          * llog_receptor_accept().
258          */
259         req = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
260         if (req == NULL) {
261                 CERROR("Can't allocate request for sending llcd %p\n",
262                        llcd);
263                 GOTO(exit, rc = -ENOMEM);
264         }
265         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES,
266                              RCL_CLIENT, llcd->llcd_cookiebytes);
267
268         rc = ptlrpc_request_bufs_pack(req, LUSTRE_LOG_VERSION,
269                                       OBD_LOG_CANCEL, bufs, NULL);
270         if (rc) {
271                 ptlrpc_request_free(req);
272                 GOTO(exit, rc);
273         }
274
275         ptlrpc_at_set_req_timeout(req);
276         ptlrpc_request_set_replen(req);
277
278         /* bug 5515 */
279         req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
280         req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
281
282         req->rq_interpret_reply = (ptlrpc_interpterer_t)llcd_interpret;
283
284         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
285         la = ptlrpc_req_async_args(req);
286         la->la_ctxt = llcd;
287
288         /* llog cancels will be replayed after reconnect so this will do twice
289          * first from replay llog, second for resended rpc */
290         req->rq_no_delay = req->rq_no_resend = 1;
291
292         ptlrpc_set_add_new_req(&lcm->lcm_pc, req);
293         RETURN(0);
294 exit:
295         CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd);
296         llcd_free(llcd);
297         return rc;
298 }
299
300 /**
301  * Attach @llcd to @ctxt. Establish llcd vs. ctxt reserve connection
302  * so hat they can refer each other.
303  */
304 static int
305 llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd)
306 {
307         LASSERT(ctxt != NULL && llcd != NULL);
308         LASSERT_SEM_LOCKED(&ctxt->loc_sem);
309         LASSERT(ctxt->loc_llcd == NULL);
310         llcd->llcd_ctxt = llog_ctxt_get(ctxt);
311         ctxt->loc_llcd = llcd;
312
313         CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p\n",
314                llcd, ctxt);
315
316         return 0;
317 }
318
319 /**
320  * Opposite to llcd_attach(). Detaches llcd from its @ctxt. This makes
321  * sure that this llcd will not be found another time we try to cancel.
322  */
323 static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt)
324 {
325         struct llog_canceld_ctxt *llcd;
326
327         LASSERT(ctxt != NULL);
328         LASSERT_SEM_LOCKED(&ctxt->loc_sem);
329
330         llcd = ctxt->loc_llcd;
331         if (!llcd)
332                 return NULL;
333
334         CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n",
335                llcd, ctxt);
336
337         ctxt->loc_llcd = NULL;
338         llog_ctxt_put(ctxt);
339         return llcd;
340 }
341
342 /**
343  * Return @llcd cached in @ctxt. Allocate new one if required. Attach it
344  * to ctxt so that it may be used for gathering cookies and sending.
345  */
346 static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
347 {
348         struct llog_canceld_ctxt *llcd;
349         LASSERT(ctxt);
350         llcd = llcd_alloc(ctxt->loc_lcm);
351         if (!llcd) {
352                 CERROR("Can't alloc an llcd for ctxt %p\n", ctxt);
353                 return NULL;
354         }
355         llcd_attach(ctxt, llcd);
356         return llcd;
357 }
358
359 /**
360  * Deatch llcd from its @ctxt. Free llcd.
361  */
362 static void llcd_put(struct llog_ctxt *ctxt)
363 {
364         struct llog_canceld_ctxt *llcd;
365
366         llcd = llcd_detach(ctxt);
367         if (llcd)
368                 llcd_free(llcd);
369 }
370
371 /**
372  * Detach llcd from its @ctxt so that nobody will find it with try to
373  * re-use. Send llcd to remote node.
374  */
375 static int llcd_push(struct llog_ctxt *ctxt)
376 {
377         struct llog_canceld_ctxt *llcd;
378         int rc;
379
380         /*
381          * Make sure that this llcd will not be sent again as we detach
382          * it from ctxt.
383          */
384         llcd = llcd_detach(ctxt);
385         if (!llcd) {
386                 CERROR("Invalid detached llcd found %p\n", llcd);
387                 llcd_print(llcd, __FUNCTION__, __LINE__);
388                 LBUG();
389         }
390
391         rc = llcd_send(llcd);
392         if (rc)
393                 CERROR("Couldn't send llcd %p (%d)\n", llcd, rc);
394         return rc;
395 }
396
397 /**
398  * Start recovery thread which actually deals llcd sending. This
399  * is all ptlrpc standard thread based so there is not much of work
400  * to do.
401  */
402 int llog_recov_thread_start(struct llog_commit_master *lcm)
403 {
404         int rc;
405         ENTRY;
406
407         rc = ptlrpcd_start(-1, 1, lcm->lcm_name, &lcm->lcm_pc);
408         if (rc) {
409                 CERROR("Error %d while starting recovery thread %s\n",
410                        rc, lcm->lcm_name);
411                 RETURN(rc);
412         }
413         RETURN(rc);
414 }
415 EXPORT_SYMBOL(llog_recov_thread_start);
416
417 /**
418  * Stop recovery thread. Complement to llog_recov_thread_start().
419  */
420 void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
421 {
422         ENTRY;
423
424         /*
425          * Let all know that we're stopping. This will also make
426          * llcd_send() refuse any new llcds.
427          */
428         cfs_set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags);
429
430         /*
431          * Stop processing thread. No new rpcs will be accepted for
432          * for processing now.
433          */
434         ptlrpcd_stop(&lcm->lcm_pc, force);
435
436         /*
437          * By this point no alive inflight llcds should be left. Only
438          * those forgotten in sync may still be attached to ctxt. Let's
439          * print them.
440          */
441         if (cfs_atomic_read(&lcm->lcm_count) != 0) {
442                 struct llog_canceld_ctxt *llcd;
443                 cfs_list_t               *tmp;
444
445                 CERROR("Busy llcds found (%d) on lcm %p\n",
446                        cfs_atomic_read(&lcm->lcm_count), lcm);
447
448                 cfs_spin_lock(&lcm->lcm_lock);
449                 cfs_list_for_each(tmp, &lcm->lcm_llcds) {
450                         llcd = cfs_list_entry(tmp, struct llog_canceld_ctxt,
451                                               llcd_list);
452                         llcd_print(llcd, __FUNCTION__, __LINE__);
453                 }
454                 cfs_spin_unlock(&lcm->lcm_lock);
455
456                 /*
457                  * No point to go further with busy llcds at this point
458                  * as this is clear bug. It might mean we got hanging
459                  * rpc which holds import ref and this means we will not
460                  * be able to cleanup anyways.
461                  *
462                  * Or we just missed to kill them when they were not
463                  * attached to ctxt. In this case our slab will remind
464                  * us about this a bit later.
465                  */
466                 LBUG();
467         }
468         EXIT;
469 }
470 EXPORT_SYMBOL(llog_recov_thread_stop);
471
472 /**
473  * Initialize commit master structure and start recovery thread on it.
474  */
475 struct llog_commit_master *llog_recov_thread_init(char *name)
476 {
477         struct llog_commit_master *lcm;
478         int rc;
479         ENTRY;
480
481         OBD_ALLOC_PTR(lcm);
482         if (!lcm)
483                 RETURN(NULL);
484
485         /*
486          * Try to create threads with unique names.
487          */
488         snprintf(lcm->lcm_name, sizeof(lcm->lcm_name),
489                  "lcm_%s", name);
490
491         cfs_atomic_set(&lcm->lcm_count, 0);
492         cfs_atomic_set(&lcm->lcm_refcount, 1);
493         cfs_spin_lock_init(&lcm->lcm_lock);
494         CFS_INIT_LIST_HEAD(&lcm->lcm_llcds);
495         rc = llog_recov_thread_start(lcm);
496         if (rc) {
497                 CERROR("Can't start commit thread, rc %d\n", rc);
498                 GOTO(out, rc);
499         }
500         RETURN(lcm);
501 out:
502         OBD_FREE_PTR(lcm);
503         return NULL;
504 }
505 EXPORT_SYMBOL(llog_recov_thread_init);
506
507 /**
508  * Finalize commit master and its recovery thread.
509  */
510 void llog_recov_thread_fini(struct llog_commit_master *lcm, int force)
511 {
512         ENTRY;
513         llog_recov_thread_stop(lcm, force);
514         lcm_put(lcm);
515         EXIT;
516 }
517 EXPORT_SYMBOL(llog_recov_thread_fini);
518
519 static int llog_recov_thread_replay(struct llog_ctxt *ctxt,
520                                     void *cb, void *arg)
521 {
522         struct obd_device *obd = ctxt->loc_obd;
523         struct llog_process_cat_args *lpca;
524         int rc;
525         ENTRY;
526
527         if (obd->obd_stopping)
528                 RETURN(-ENODEV);
529
530         /*
531          * This will be balanced in llog_cat_process_thread()
532          */
533         OBD_ALLOC_PTR(lpca);
534         if (!lpca)
535                 RETURN(-ENOMEM);
536
537         lpca->lpca_cb = cb;
538         lpca->lpca_arg = arg;
539
540         /*
541          * This will be balanced in llog_cat_process_thread()
542          */
543         lpca->lpca_ctxt = llog_ctxt_get(ctxt);
544         if (!lpca->lpca_ctxt) {
545                 OBD_FREE_PTR(lpca);
546                 RETURN(-ENODEV);
547         }
548         rc = cfs_create_thread(llog_cat_process_thread, lpca, CFS_DAEMON_FLAGS);
549         if (rc < 0) {
550                 CERROR("Error starting llog_cat_process_thread(): %d\n", rc);
551                 OBD_FREE_PTR(lpca);
552                 llog_ctxt_put(ctxt);
553         } else {
554                 CDEBUG(D_HA, "Started llog_cat_process_thread(): %d\n", rc);
555                 rc = 0;
556         }
557
558         RETURN(rc);
559 }
560
561 int llog_obd_repl_connect(struct llog_ctxt *ctxt,
562                           struct llog_logid *logid, struct llog_gen *gen,
563                           struct obd_uuid *uuid)
564 {
565         int rc;
566         ENTRY;
567
568         /*
569          * Send back cached llcd from llog before recovery if we have any.
570          * This is void is nothing cached is found there.
571          */
572         llog_sync(ctxt, NULL);
573
574         /*
575          * Start recovery in separate thread.
576          */
577         cfs_mutex_down(&ctxt->loc_sem);
578         ctxt->loc_gen = *gen;
579         rc = llog_recov_thread_replay(ctxt, ctxt->llog_proc_cb, logid);
580         cfs_mutex_up(&ctxt->loc_sem);
581
582         RETURN(rc);
583 }
584 EXPORT_SYMBOL(llog_obd_repl_connect);
585
586 /**
587  * Deleted objects have a commit callback that cancels the MDS
588  * log record for the deletion. The commit callback calls this
589  * function.
590  */
591 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
592                          struct lov_stripe_md *lsm, int count,
593                          struct llog_cookie *cookies, int flags)
594 {
595         struct llog_commit_master *lcm;
596         struct llog_canceld_ctxt *llcd;
597         int rc = 0;
598         ENTRY;
599
600         LASSERT(ctxt != NULL);
601
602         cfs_mutex_down(&ctxt->loc_sem);
603         if (!ctxt->loc_lcm) {
604                 CDEBUG(D_RPCTRACE, "No lcm for ctxt %p\n", ctxt);
605                 GOTO(out, rc = -ENODEV);
606         }
607         lcm = ctxt->loc_lcm;
608         CDEBUG(D_INFO, "cancel on lsm %p\n", lcm);
609
610         /*
611          * Let's check if we have all structures alive. We also check for
612          * possible shutdown. Do nothing if we're stopping.
613          */
614         if (ctxt->loc_imp == NULL) {
615                 CDEBUG(D_RPCTRACE, "No import for ctxt %p\n", ctxt);
616                 GOTO(out, rc = -ENODEV);
617         }
618
619         if (cfs_test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) {
620                 CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n",
621                        ctxt);
622                 GOTO(out, rc = -ENODEV);
623         }
624
625         llcd = ctxt->loc_llcd;
626
627         if (count > 0 && cookies != NULL) {
628                 /*
629                  * Get new llcd from ctxt if required.
630                  */
631                 if (!llcd) {
632                         llcd = llcd_get(ctxt);
633                         if (!llcd)
634                                 GOTO(out, rc = -ENOMEM);
635                         /*
636                          * Allocation is successful, let's check for stop
637                          * flag again to fall back as soon as possible.
638                          */
639                         if (cfs_test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
640                                 GOTO(out, rc = -ENODEV);
641                 }
642
643                 /*
644                  * Llcd does not have enough room for @cookies. Let's push
645                  * it out and allocate new one.
646                  */
647                 if (!llcd_fit(llcd, cookies)) {
648                         rc = llcd_push(ctxt);
649                         if (rc)
650                                 GOTO(out, rc);
651                         llcd = llcd_get(ctxt);
652                         if (!llcd)
653                                 GOTO(out, rc = -ENOMEM);
654                         /*
655                          * Allocation is successful, let's check for stop
656                          * flag again to fall back as soon as possible.
657                          */
658                         if (cfs_test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
659                                 GOTO(out, rc = -ENODEV);
660                 }
661
662                 /*
663                  * Copy cookies to @llcd, no matter old or new allocated
664                  * one.
665                  */
666                 llcd_copy(llcd, cookies);
667         }
668
669         /*
670          * Let's check if we need to send copied @cookies asap. If yes
671          * then do it.
672          */
673         if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) {
674                 CDEBUG(D_RPCTRACE, "Sync llcd %p\n", llcd);
675                 rc = llcd_push(ctxt);
676                 if (rc)
677                         GOTO(out, rc);
678         }
679         EXIT;
680 out:
681         if (rc)
682                 llcd_put(ctxt);
683         cfs_mutex_up(&ctxt->loc_sem);
684         return rc;
685 }
686 EXPORT_SYMBOL(llog_obd_repl_cancel);
687
688 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
689 {
690         int rc = 0;
691         ENTRY;
692
693         /*
694          * Flush any remaining llcd.
695          */
696         cfs_mutex_down(&ctxt->loc_sem);
697         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
698                 /*
699                  * This is ost->mds connection, we can't be sure that mds
700                  * can still receive cookies, let's killed the cached llcd.
701                  */
702                 CDEBUG(D_RPCTRACE, "Kill cached llcd\n");
703                 llcd_put(ctxt);
704                 cfs_mutex_up(&ctxt->loc_sem);
705         } else {
706                 /*
707                  * This is either llog_sync() from generic llog code or sync
708                  * on client disconnect. In either way let's do it and send
709                  * llcds to the target with waiting for completion.
710                  */
711                 CDEBUG(D_RPCTRACE, "Sync cached llcd\n");
712                 cfs_mutex_up(&ctxt->loc_sem);
713                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
714         }
715         RETURN(rc);
716 }
717 EXPORT_SYMBOL(llog_obd_repl_sync);
718
719 #else /* !__KERNEL__ */
720
721 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
722                          struct lov_stripe_md *lsm, int count,
723                          struct llog_cookie *cookies, int flags)
724 {
725         return 0;
726 }
727 #endif
728
729 /**
730  * Module init time fucntion. Initializes slab for llcd objects.
731  */
732 int llog_recov_init(void)
733 {
734         int llcd_size;
735
736         llcd_size = CFS_PAGE_SIZE -
737                 lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
738         llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies);
739         llcd_cache = cfs_mem_cache_create("llcd_cache", llcd_size, 0, 0);
740         if (!llcd_cache) {
741                 CERROR("Error allocating llcd cache\n");
742                 return -ENOMEM;
743         }
744         return 0;
745 }
746
747 /**
748  * Module fini time fucntion. Releases slab for llcd objects.
749  */
750 void llog_recov_fini(void)
751 {
752         /*
753          * Kill llcd cache when thread is stopped and we're sure no
754          * llcd in use left.
755          */
756         if (llcd_cache) {
757                 /*
758                  * In 2.6.22 cfs_mem_cache_destroy() will not return error
759                  * for busy resources. Let's check it another way.
760                  */
761                 LASSERTF(cfs_atomic_read(&llcd_count) == 0,
762                          "Can't destroy llcd cache! Number of "
763                          "busy llcds: %d\n", cfs_atomic_read(&llcd_count));
764                 cfs_mem_cache_destroy(llcd_cache);
765                 llcd_cache = NULL;
766         }
767 }