Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/recov_thread.c
37  *
38  * OST<->MDS recovery logging thread.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger   <adilger@clusterfs.com>
44  *         Yury Umanets     <yury.umanets@sun.com>
45  *         Alexey Lyashkov  <alexey.lyashkov@sun.com>
46  */
47
48 #define DEBUG_SUBSYSTEM S_LOG
49
50 #ifndef EXPORT_SYMTAB
51 # define EXPORT_SYMTAB
52 #endif
53
54 #ifdef __KERNEL__
55 # include <libcfs/libcfs.h>
56 #else
57 # include <libcfs/list.h>
58 # include <liblustre.h>
59 #endif
60
61 #include <obd_class.h>
62 #include <obd_support.h>
63 #include <obd_class.h>
64 #include <lustre_net.h>
65 #include <lnet/types.h>
66 #include <libcfs/list.h>
67 #include <lustre_log.h>
68 #include "ptlrpc_internal.h"
69
70 static atomic_t                   llcd_count = ATOMIC_INIT(0);
71 static cfs_mem_cache_t           *llcd_cache = NULL;
72
73 #ifdef __KERNEL__
74 enum {
75         LLOG_LCM_FL_START       = 1 << 0,
76         LLOG_LCM_FL_EXIT        = 1 << 1
77 };
78
79 /** 
80  * Allocate new llcd from cache, init it and return to caller.
81  * Bumps number of objects allocated.
82  */
83 static struct llog_canceld_ctxt *llcd_alloc(void)
84 {
85         struct llog_canceld_ctxt *llcd;
86         int llcd_size;
87
88         /* 
89          * Payload of lustre_msg V2 is bigger.
90          */
91         llcd_size = CFS_PAGE_SIZE - 
92                 lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
93         llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies);
94         OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, llcd_size);
95         if (!llcd)
96                 return NULL;
97
98         llcd->llcd_size = llcd_size;
99         llcd->llcd_cookiebytes = 0;
100         atomic_inc(&llcd_count);
101         return llcd;
102 }
103
104 /**
105  * Returns passed llcd to cache.
106  */
107 static void llcd_free(struct llog_canceld_ctxt *llcd)
108 {
109         OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size);
110         atomic_dec(&llcd_count);
111 }
112
113 /**
114  * Copy passed @cookies to @llcd.
115  */
116 static void llcd_copy(struct llog_canceld_ctxt *llcd, 
117                       struct llog_cookie *cookies)
118 {
119         memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
120               cookies, sizeof(*cookies));
121         llcd->llcd_cookiebytes += sizeof(*cookies);
122 }
123
124 /**
125  * Checks if passed cookie fits into llcd free space buffer. Returns
126  * 1 if yes and 0 otherwise.
127  */
128 static int llcd_fit(struct llog_canceld_ctxt *llcd,
129                  struct llog_cookie *cookies)
130 {
131         return (llcd->llcd_size - 
132                 llcd->llcd_cookiebytes) >= sizeof(*cookies);
133 }
134
135 static void llcd_print(struct llog_canceld_ctxt *llcd, 
136                        const char *func, int line) 
137 {
138         CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line);
139         CDEBUG(D_RPCTRACE, "  size: %d\n", llcd->llcd_size);
140         CDEBUG(D_RPCTRACE, "  ctxt: %p\n", llcd->llcd_ctxt);
141         CDEBUG(D_RPCTRACE, "  lcm : %p\n", llcd->llcd_lcm);
142         CDEBUG(D_RPCTRACE, "  cookiebytes : %d\n", llcd->llcd_cookiebytes);
143 }
144
145 /**
146  * Llcd completion function. Called uppon llcd send finish regardless
147  * sending result. Error is passed in @rc. Note, that this will be called
148  * in cleanup time when all inflight rpcs aborted.
149  */
150 static int 
151 llcd_interpret(struct ptlrpc_request *req, void *noused, int rc)
152 {
153         struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0];
154         CDEBUG(D_RPCTRACE, "Sent llcd %p (%d)\n", llcd, rc);
155         llcd_free(llcd);
156         return 0;
157 }
158  
159 /**
160  * Send @llcd to remote node. Free llcd uppon completion or error. Sending
161  * is performed in async style so this function will return asap without 
162  * blocking.
163  */
164 static int llcd_send(struct llog_canceld_ctxt *llcd)
165 {
166         char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
167         struct obd_import *import = NULL;
168         struct llog_commit_master *lcm;
169         struct ptlrpc_request *req;
170         struct llog_ctxt *ctxt;
171         int rc;
172         ENTRY;
173
174         ctxt = llcd->llcd_ctxt;
175         if (!ctxt) {
176                 CERROR("Invalid llcd with NULL ctxt found (%p)\n", 
177                        llcd);
178                 llcd_print(llcd, __FUNCTION__, __LINE__);
179                 LBUG();
180         }
181         LASSERT_SEM_LOCKED(&ctxt->loc_sem);
182
183         if (llcd->llcd_cookiebytes == 0)
184                 GOTO(exit, rc = 0);
185
186         lcm = llcd->llcd_lcm;
187         
188         /* 
189          * Check if we're in exit stage. Do not send llcd in
190          * this case. 
191          */
192         if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
193                 GOTO(exit, rc = -ENODEV);
194
195         CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd);
196
197         import = llcd->llcd_ctxt->loc_imp;
198         if (!import || (import == LP_POISON) || 
199             (import->imp_client == LP_POISON)) {
200                 CERROR("Invalid import %p for llcd %p\n", 
201                        import, llcd);
202                 GOTO(exit, rc = -ENODEV);
203         }
204
205         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
206
207         /*
208          * No need to get import here as it is already done in 
209          * llog_receptor_accept().
210          */
211         req = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
212         if (req == NULL) {
213                 CERROR("Can't allocate request for sending llcd %p\n", 
214                        llcd);
215                 GOTO(exit, rc = -ENOMEM);
216         }
217         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES,
218                              RCL_CLIENT, llcd->llcd_cookiebytes);
219
220         rc = ptlrpc_request_bufs_pack(req, LUSTRE_LOG_VERSION,
221                                       OBD_LOG_CANCEL, bufs, NULL);
222         if (rc) {
223                 ptlrpc_request_free(req);
224                 GOTO(exit, rc);
225         }
226
227         ptlrpc_at_set_req_timeout(req);
228         ptlrpc_request_set_replen(req);
229
230         /* bug 5515 */
231         req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
232         req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
233         req->rq_interpret_reply = llcd_interpret;
234         req->rq_async_args.pointer_arg[0] = llcd;
235         rc = ptlrpc_set_add_new_req(&lcm->lcm_pc, req);
236         if (rc)
237                 GOTO(exit, rc);
238
239         RETURN(0);
240 exit:
241         CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd);
242         llcd_free(llcd);
243         return rc;
244 }
245
246 /**
247  * Attach @llcd to @ctxt. Establish llcd vs. ctxt reserve connection
248  * so hat they can refer each other.
249  */
250 static int
251 llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd)
252 {
253         struct llog_commit_master *lcm;
254
255         LASSERT(ctxt != NULL && llcd != NULL);
256         LASSERT_SEM_LOCKED(&ctxt->loc_sem);
257         LASSERT(ctxt->loc_llcd == NULL);
258         lcm = ctxt->loc_lcm;
259         atomic_inc(&lcm->lcm_count);
260         CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n",
261                llcd, ctxt, atomic_read(&lcm->lcm_count));
262         llcd->llcd_ctxt = llog_ctxt_get(ctxt);
263         llcd->llcd_lcm = ctxt->loc_lcm;
264         ctxt->loc_llcd = llcd;
265         return 0;
266 }
267
268 /**
269  * Opposite to llcd_attach(). Detaches llcd from its @ctxt. This makes
270  * sure that this llcd will not be found another time we try to cancel.
271  */
272 static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt)
273 {
274         struct llog_commit_master *lcm;
275         struct llog_canceld_ctxt *llcd;
276
277         LASSERT(ctxt != NULL);
278         LASSERT_SEM_LOCKED(&ctxt->loc_sem);
279
280         llcd = ctxt->loc_llcd;
281         if (!llcd)
282                 return NULL;
283
284         lcm = ctxt->loc_lcm;
285         if (atomic_read(&lcm->lcm_count) == 0) {
286                 CERROR("Invalid detach occured %p:%p\n", ctxt, llcd);
287                 llcd_print(llcd, __FUNCTION__, __LINE__);
288                 LBUG();
289         }
290         atomic_dec(&lcm->lcm_count);
291         ctxt->loc_llcd = NULL;
292         
293         CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n", 
294                llcd, ctxt, atomic_read(&lcm->lcm_count));
295
296         llog_ctxt_put(ctxt);
297         return llcd;
298 }
299
300 /**
301  * Return @llcd cached in @ctxt. Allocate new one if required. Attach it
302  * to ctxt so that it may be used for gathering cookies and sending.
303  */
304 static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
305 {
306         struct llog_canceld_ctxt *llcd;
307
308         llcd = llcd_alloc();
309         if (!llcd) {
310                 CERROR("Couldn't alloc an llcd for ctxt %p\n", ctxt);
311                 return NULL;
312         }
313         llcd_attach(ctxt, llcd);
314         return llcd;
315 }
316
317 /**
318  * Deatch llcd from its @ctxt. Free llcd.
319  */
320 static void llcd_put(struct llog_ctxt *ctxt)
321 {
322         struct llog_canceld_ctxt *llcd;
323
324         llcd = llcd_detach(ctxt);
325         if (llcd)
326                 llcd_free(llcd);
327 }
328
329 /**
330  * Detach llcd from its @ctxt so that nobody will find it with try to
331  * re-use. Send llcd to remote node.
332  */
333 static int llcd_push(struct llog_ctxt *ctxt)
334 {
335         struct llog_canceld_ctxt *llcd;
336         int rc;
337
338         /*
339          * Make sure that this llcd will not be sent again as we detach 
340          * it from ctxt.
341          */
342         llcd = llcd_detach(ctxt);
343         if (!llcd) {
344                 CERROR("Invalid detached llcd found %p\n", llcd);
345                 llcd_print(llcd, __FUNCTION__, __LINE__);
346                 LBUG();
347         }
348         
349         rc = llcd_send(llcd);
350         if (rc)
351                 CERROR("Couldn't send llcd %p (%d)\n", llcd, rc);
352         return rc;
353 }
354
355 static atomic_t llog_tcount = ATOMIC_INIT(0);
356
357 /**
358  * Start recovery thread which actually deals llcd sending. This
359  * is all ptlrpc standard thread based so there is not much of work
360  * to do.
361  */
362 int llog_recov_thread_start(struct llog_commit_master *lcm)
363 {
364         int rc;
365         ENTRY;
366
367         rc = ptlrpcd_start(lcm->lcm_name, &lcm->lcm_pc);
368         if (rc) {
369                 CERROR("Error %d while starting recovery thread %s\n", 
370                        rc, lcm->lcm_name);
371                 RETURN(rc);
372         }
373         lcm->lcm_set = lcm->lcm_pc.pc_set;
374         atomic_inc(&llog_tcount);
375
376         RETURN(rc);
377 }
378 EXPORT_SYMBOL(llog_recov_thread_start);
379
380 /**
381  * Stop recovery thread. Complement to llog_recov_thread_start().
382  */
383 void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
384 {
385         ENTRY;
386         
387         /**
388          * Let all know that we're stopping. This will also make 
389          * llcd_send() refuse any new llcds.
390          */
391         set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags);
392
393         /**
394          * Stop processing thread. No new rpcs will be accepted for
395          * for processing now.
396          */
397         ptlrpcd_stop(&lcm->lcm_pc, force);
398         
399         /*
400          * No llcds on this @lcm should left.
401          */
402         LASSERTF(atomic_read(&lcm->lcm_count) == 0, 
403                  "Busy llcds found on lcm %p - (%d)\n", 
404                  lcm, atomic_read(&lcm->lcm_count));
405         EXIT;
406 }
407 EXPORT_SYMBOL(llog_recov_thread_stop);
408
409 /**
410  * Initialize commit master structure and start recovery thread on it.
411  */
412 struct llog_commit_master *llog_recov_thread_init(char *name)
413 {
414         struct llog_commit_master *lcm;
415         int rc;
416         ENTRY;
417
418         OBD_ALLOC_PTR(lcm);
419         if (!lcm)
420                 RETURN(NULL);
421
422         /*
423          * Try to create threads with unique names and user id.
424          */
425         snprintf(lcm->lcm_name, sizeof(lcm->lcm_name), 
426                  "ll_log_commit_%s_%02d", name, 
427                  atomic_read(&llog_tcount));
428
429         strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name));
430         atomic_set(&lcm->lcm_count, 0);
431         rc = llog_recov_thread_start(lcm);
432         if (rc) {
433                 CERROR("Can't start commit thread, rc %d\n", rc);
434                 GOTO(out, rc);
435         }
436         RETURN(lcm);
437 out:
438         OBD_FREE_PTR(lcm);
439         return NULL;
440 }
441 EXPORT_SYMBOL(llog_recov_thread_init);
442
443 /**
444  * Finalize commit master and its recovery thread.
445  */
446 void llog_recov_thread_fini(struct llog_commit_master *lcm, int force)
447 {
448         ENTRY;
449         llog_recov_thread_stop(lcm, force);
450         OBD_FREE_PTR(lcm);
451         EXIT;
452 }
453 EXPORT_SYMBOL(llog_recov_thread_fini);
454
455 static int llog_obd_repl_generic(struct llog_ctxt *ctxt, 
456                                  void *handle, void *arg)
457 {
458         struct obd_device *obd = ctxt->loc_obd;
459         struct llog_process_cat_args *lpca;
460         int rc;
461         ENTRY;
462
463         if (obd->obd_stopping)
464                 RETURN(-ENODEV);
465
466         /*
467          * This will be balanced in llog_cat_process_thread()
468          */
469         OBD_ALLOC_PTR(lpca);
470         if (!lpca)
471                 RETURN(-ENOMEM);
472
473         lpca->lpca_cb = handle;
474         lpca->lpca_arg = arg;
475
476         /*
477          * This will be balanced in llog_cat_process_thread()
478          */
479         lpca->lpca_ctxt = llog_ctxt_get(ctxt);
480         if (!lpca->lpca_ctxt) {
481                 OBD_FREE_PTR(lpca);
482                 RETURN(-ENODEV);
483         }
484         rc = cfs_kernel_thread(llog_cat_process_thread, lpca, 
485                                CLONE_VM | CLONE_FILES);
486         if (rc < 0) {
487                 CERROR("Error starting llog_cat_process_thread(): %d\n", rc);
488                 OBD_FREE_PTR(lpca);
489                 llog_ctxt_put(ctxt);
490         } else {
491                 CDEBUG(D_HA, "Started llog_cat_process_thread(): %d\n", rc);
492                 rc = 0;
493         }
494
495         RETURN(rc);
496 }
497
498 int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count,
499                           struct llog_logid *logid, struct llog_gen *gen,
500                           struct obd_uuid *uuid)
501 {
502         struct llog_canceld_ctxt *llcd;
503         int rc;
504         ENTRY;
505
506         mutex_down(&ctxt->loc_sem);
507
508         /* 
509          * Send back cached llcd before recovery from llog if we have any. 
510          */
511         if (ctxt->loc_llcd) {
512                 CWARN("Llcd %p:%p is not empty\n", ctxt->loc_llcd, ctxt);
513                 mutex_up(&ctxt->loc_sem);
514                 llog_sync(ctxt, NULL);
515                 mutex_down(&ctxt->loc_sem);
516         }
517
518         llcd = llcd_get(ctxt);
519         if (!llcd) {
520                 mutex_up(&ctxt->loc_sem);
521                 RETURN(-ENOMEM);
522         }
523
524         ctxt->loc_gen = *gen;
525
526         rc = llog_obd_repl_generic(ctxt, ctxt->llog_proc_cb, logid);
527         if (rc != 0) {
528                 llcd_put(ctxt);
529                 CERROR("Error recovery process: %d\n", rc);
530         }
531         mutex_up(&ctxt->loc_sem);
532         RETURN(rc);
533 }
534 EXPORT_SYMBOL(llog_obd_repl_connect);
535
536 /** 
537  * Deleted objects have a commit callback that cancels the MDS
538  * log record for the deletion. The commit callback calls this
539  * function.
540  */
541 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
542                          struct lov_stripe_md *lsm, int count,
543                          struct llog_cookie *cookies, int flags)
544 {
545         struct llog_canceld_ctxt *llcd;
546         int rc = 0;
547         ENTRY;
548
549         LASSERT(ctxt != NULL);
550
551         mutex_down(&ctxt->loc_sem);
552         
553         /*
554          * Let's check if we have all structures alive. We also check for
555          * possible shutdown. Do nothing if we're stopping.
556          */
557         if (ctxt->loc_imp == NULL) {
558                 CDEBUG(D_RPCTRACE, "No import for ctxt %p\n", ctxt);
559                 GOTO(out, rc = -ENODEV);
560         }
561
562         if (ctxt->loc_obd->obd_stopping) {
563                 CDEBUG(D_RPCTRACE, "Obd is stopping for ctxt %p\n", ctxt);
564                 GOTO(out, rc = -ENODEV);
565         }
566
567         if (test_bit(LLOG_LCM_FL_EXIT, &ctxt->loc_lcm->lcm_flags)) {
568                 CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n", 
569                        ctxt);
570                 GOTO(out, rc = -ENODEV);
571         }
572
573         llcd = ctxt->loc_llcd;
574
575         if (count > 0 && cookies != NULL) {
576                 /*
577                  * Get new llcd from ctxt if required. 
578                  */
579                 if (!llcd) {
580                         llcd = llcd_get(ctxt);
581                         if (!llcd)
582                                 GOTO(out, rc = -ENOMEM);
583                 }
584
585                 /*
586                  * Llcd does not have enough room for @cookies. Let's push 
587                  * it out and allocate new one. 
588                  */
589                 if (!llcd_fit(llcd, cookies)) {
590                         rc = llcd_push(ctxt);
591                         if (rc)
592                                 GOTO(out, rc);
593                         llcd = llcd_get(ctxt);
594                         if (!llcd)
595                                 GOTO(out, rc = -ENOMEM);
596                 }
597
598                 /*
599                  * Copy cookies to @llcd, no matter old or new allocated one.
600                  */
601                 llcd_copy(llcd, cookies);
602         }
603
604         /*
605          * Let's check if we need to send copied @cookies asap. If yes - do it.
606          */
607         if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) {
608                 rc = llcd_push(ctxt);
609                 if (rc)
610                         GOTO(out, rc);
611         }
612         EXIT;
613 out:
614         mutex_up(&ctxt->loc_sem);
615         return rc;
616 }
617 EXPORT_SYMBOL(llog_obd_repl_cancel);
618
619 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
620 {
621         int rc = 0;
622         ENTRY;
623
624         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
625                 CDEBUG(D_RPCTRACE, "Reverse import disconnect\n");
626                 /*
627                  * Check for llcd which might be left attached to @ctxt.
628                  * Let's kill it.
629                  */
630                 mutex_down(&ctxt->loc_sem);
631                 llcd_put(ctxt);
632                 mutex_up(&ctxt->loc_sem);
633         } else {
634                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
635         }
636         RETURN(rc);
637 }
638 EXPORT_SYMBOL(llog_obd_repl_sync);
639
640 #else /* !__KERNEL__ */
641
642 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
643                          struct lov_stripe_md *lsm, int count,
644                          struct llog_cookie *cookies, int flags)
645 {
646         return 0;
647 }
648 #endif
649
650 /**
651  * Module init time fucntion. Initializes slab for llcd objects.
652  */
653 int llog_recov_init(void)
654 {
655         int llcd_size;
656
657         llcd_size = CFS_PAGE_SIZE - 
658                 lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
659         llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies);
660         llcd_cache = cfs_mem_cache_create("llcd_cache", llcd_size, 0, 0);
661         if (!llcd_cache) {
662                 CERROR("Error allocating llcd cache\n");
663                 return -ENOMEM;
664         }
665         return 0;
666 }
667
668 /**
669  * Module fini time fucntion. Releases slab for llcd objects.
670  */
671 void llog_recov_fini(void)
672 {
673         int count;
674
675         /*
676          * Kill llcd cache when thread is stopped and we're sure no 
677          * llcd in use left.
678          */
679         if (llcd_cache) {
680                 /*
681                  * In 2.6.22 cfs_mem_cache_destroy() will not return error
682                  * for busy resources. Let's check it another way.
683                  */
684                 count = atomic_read(&llcd_count);
685                 LASSERTF(count == 0, "Can't destroy llcd cache! Number of "
686                          "busy llcds: %d\n", count);
687                 cfs_mem_cache_destroy(llcd_cache);
688                 llcd_cache = NULL;
689         }
690 }