Whamcloud - gitweb
Branch b1_8_gate
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150                         struct lprocfs_vars *vars, const char *name,
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173
174         if (type->typ_dt_ops == NULL ||
175             type->typ_md_ops == NULL ||
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = lu_device_type_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 lu_device_type_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 /**
258  * Create a new obd device.
259  *
260  * Find an empty slot in ::obd_devs[], create a new obd device in it.
261  *
262  * \param typename [in] obd device type string.
263  * \param name     [in] obd device name.
264  *
265  * \retval NULL if create fails, otherwise return the obd device
266  *         pointer created.
267  */
268 struct obd_device *class_newdev(const char *type_name, const char *name)
269 {
270         struct obd_device *result = NULL;
271         struct obd_device *newdev;
272         struct obd_type *type = NULL;
273         int i;
274         int new_obd_minor = 0;
275
276         if (strlen(name) >= MAX_OBD_NAME) {
277                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278                 RETURN(ERR_PTR(-EINVAL));
279         }
280
281         type = class_get_type(type_name);
282         if (type == NULL){
283                 CERROR("OBD: unknown type: %s\n", type_name);
284                 RETURN(ERR_PTR(-ENODEV));
285         }
286
287         newdev = obd_device_alloc();
288         if (newdev == NULL) {
289                 class_put_type(type);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
293
294         spin_lock(&obd_dev_lock);
295         for (i = 0; i < class_devno_max(); i++) {
296                 struct obd_device *obd = class_num2obd(i);
297                 if (obd && obd->obd_name &&
298                     (strcmp(name, obd->obd_name) == 0)) {
299                         CERROR("Device %s already exists, won't add\n", name);
300                         if (result) {
301                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302                                          "%p obd_magic %08x != %08x\n", result,
303                                          result->obd_magic, OBD_DEVICE_MAGIC);
304                                 LASSERTF(result->obd_minor == new_obd_minor,
305                                          "%p obd_minor %d != %d\n", result,
306                                          result->obd_minor, new_obd_minor);
307
308                                 obd_devs[result->obd_minor] = NULL;
309                                 result->obd_name[0]='\0';
310                          }
311                         result = ERR_PTR(-EEXIST);
312                         break;
313                 }
314                 if (!result && !obd) {
315                         result = newdev;
316                         result->obd_minor = i;
317                         new_obd_minor = i;
318                         result->obd_type = type;
319                         strncpy(result->obd_name, name,
320                                 sizeof(result->obd_name) - 1);
321                         obd_devs[i] = result;
322                 }
323         }
324         spin_unlock(&obd_dev_lock);
325
326         if (result == NULL && i >= class_devno_max()) {
327                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328                        class_devno_max());
329                 result = ERR_PTR(-EOVERFLOW);
330         }
331
332         if (IS_ERR(result)) {
333                 obd_device_free(newdev);
334                 class_put_type(type);
335         } else {
336                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337                        result->obd_name, result);
338         }
339         return result;
340 }
341
342 void class_release_dev(struct obd_device *obd)
343 {
344         struct obd_type *obd_type = obd->obd_type;
345
346         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350         LASSERT(obd_type != NULL);
351
352         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353                obd->obd_name,obd->obd_type->typ_name);
354
355         spin_lock(&obd_dev_lock);
356         obd_devs[obd->obd_minor] = NULL;
357         spin_unlock(&obd_dev_lock);
358         obd_device_free(obd);
359
360         class_put_type(obd_type);
361 }
362
363 int class_name2dev(const char *name)
364 {
365         int i;
366
367         if (!name)
368                 return -1;
369
370         spin_lock(&obd_dev_lock);
371         for (i = 0; i < class_devno_max(); i++) {
372                 struct obd_device *obd = class_num2obd(i);
373                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374                         /* Make sure we finished attaching before we give
375                            out any references */
376                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377                         if (obd->obd_attached) {
378                                 spin_unlock(&obd_dev_lock);
379                                 return i;
380                         }
381                         break;
382                 }
383         }
384         spin_unlock(&obd_dev_lock);
385
386         return -1;
387 }
388
389 struct obd_device *class_name2obd(const char *name)
390 {
391         int dev = class_name2dev(name);
392
393         if (dev < 0 || dev > class_devno_max())
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 int class_uuid2dev(struct obd_uuid *uuid)
399 {
400         int i;
401
402         spin_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407                         spin_unlock(&obd_dev_lock);
408                         return i;
409                 }
410         }
411         spin_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
417 {
418         int dev = class_uuid2dev(uuid);
419         if (dev < 0)
420                 return NULL;
421         return class_num2obd(dev);
422 }
423
424 /**
425  * Get obd device from ::obd_devs[]
426  *
427  * \param num [in] array index
428  *
429  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430  *         otherwise return the obd device there.
431  */
432 struct obd_device *class_num2obd(int num)
433 {
434         struct obd_device *obd = NULL;
435
436         if (num < class_devno_max()) {
437                 obd = obd_devs[num];
438                 if (obd == NULL)
439                         return NULL;
440
441                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442                          "%p obd_magic %08x != %08x\n",
443                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444                 LASSERTF(obd->obd_minor == num,
445                          "%p obd_minor %0d != %0d\n",
446                          obd, obd->obd_minor, num);
447         }
448
449         return obd;
450 }
451
452 void class_obd_list(void)
453 {
454         char *status;
455         int i;
456
457         spin_lock(&obd_dev_lock);
458         for (i = 0; i < class_devno_max(); i++) {
459                 struct obd_device *obd = class_num2obd(i);
460                 if (obd == NULL)
461                         continue;
462                 if (obd->obd_stopping)
463                         status = "ST";
464                 else if (obd->obd_set_up)
465                         status = "UP";
466                 else if (obd->obd_attached)
467                         status = "AT";
468                 else
469                         status = "--";
470                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471                          i, status, obd->obd_type->typ_name,
472                          obd->obd_name, obd->obd_uuid.uuid,
473                          atomic_read(&obd->obd_refcount));
474         }
475         spin_unlock(&obd_dev_lock);
476         return;
477 }
478
479 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
480    specified, then only the client with that uuid is returned,
481    otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483                                           const char * typ_name,
484                                           struct obd_uuid *grp_uuid)
485 {
486         int i;
487
488         spin_lock(&obd_dev_lock);
489         for (i = 0; i < class_devno_max(); i++) {
490                 struct obd_device *obd = class_num2obd(i);
491                 if (obd == NULL)
492                         continue;
493                 if ((strncmp(obd->obd_type->typ_name, typ_name,
494                              strlen(typ_name)) == 0)) {
495                         if (obd_uuid_equals(tgt_uuid,
496                                             &obd->u.cli.cl_target_uuid) &&
497                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
498                                                          &obd->obd_uuid) : 1)) {
499                                 spin_unlock(&obd_dev_lock);
500                                 return obd;
501                         }
502                 }
503         }
504         spin_unlock(&obd_dev_lock);
505
506         return NULL;
507 }
508
509 /* Iterate the obd_device list looking devices have grp_uuid. Start
510    searching at *next, and if a device is found, the next index to look
511    at is saved in *next. If next is NULL, then the first matching device
512    will always be returned. */
513 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
514 {
515         int i;
516
517         if (next == NULL)
518                 i = 0;
519         else if (*next >= 0 && *next < class_devno_max())
520                 i = *next;
521         else
522                 return NULL;
523
524         spin_lock(&obd_dev_lock);
525         for (; i < class_devno_max(); i++) {
526                 struct obd_device *obd = class_num2obd(i);
527                 if (obd == NULL)
528                         continue;
529                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
530                         if (next != NULL)
531                                 *next = i+1;
532                         spin_unlock(&obd_dev_lock);
533                         return obd;
534                 }
535         }
536         spin_unlock(&obd_dev_lock);
537
538         return NULL;
539 }
540
541
542 void obd_cleanup_caches(void)
543 {
544         int rc;
545
546         ENTRY;
547         if (obd_device_cachep) {
548                 rc = cfs_mem_cache_destroy(obd_device_cachep);
549                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
550                 obd_device_cachep = NULL;
551         }
552         if (obdo_cachep) {
553                 rc = cfs_mem_cache_destroy(obdo_cachep);
554                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
555                 obdo_cachep = NULL;
556         }
557         if (import_cachep) {
558                 rc = cfs_mem_cache_destroy(import_cachep);
559                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
560                 import_cachep = NULL;
561         }
562         if (capa_cachep) {
563                 rc = cfs_mem_cache_destroy(capa_cachep);
564                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
565                 capa_cachep = NULL;
566         }
567         EXIT;
568 }
569
570 int obd_init_caches(void)
571 {
572         ENTRY;
573
574         LASSERT(obd_device_cachep == NULL);
575         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
576                                                  sizeof(struct obd_device),
577                                                  0, 0);
578         if (!obd_device_cachep)
579                 GOTO(out, -ENOMEM);
580
581         LASSERT(obdo_cachep == NULL);
582         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
583                                            0, 0);
584         if (!obdo_cachep)
585                 GOTO(out, -ENOMEM);
586
587         LASSERT(import_cachep == NULL);
588         import_cachep = cfs_mem_cache_create("ll_import_cache",
589                                              sizeof(struct obd_import),
590                                              0, 0);
591         if (!import_cachep)
592                 GOTO(out, -ENOMEM);
593
594         LASSERT(capa_cachep == NULL);
595         capa_cachep = cfs_mem_cache_create("capa_cache",
596                                            sizeof(struct obd_capa), 0, 0);
597         if (!capa_cachep)
598                 GOTO(out, -ENOMEM);
599
600         RETURN(0);
601  out:
602         obd_cleanup_caches();
603         RETURN(-ENOMEM);
604
605 }
606
607 /* map connection to client */
608 struct obd_export *class_conn2export(struct lustre_handle *conn)
609 {
610         struct obd_export *export;
611         ENTRY;
612
613         if (!conn) {
614                 CDEBUG(D_CACHE, "looking for null handle\n");
615                 RETURN(NULL);
616         }
617
618         if (conn->cookie == -1) {  /* this means assign a new connection */
619                 CDEBUG(D_CACHE, "want a new connection\n");
620                 RETURN(NULL);
621         }
622
623         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
624         export = class_handle2object(conn->cookie);
625         RETURN(export);
626 }
627
628 struct obd_device *class_exp2obd(struct obd_export *exp)
629 {
630         if (exp)
631                 return exp->exp_obd;
632         return NULL;
633 }
634
635 struct obd_device *class_conn2obd(struct lustre_handle *conn)
636 {
637         struct obd_export *export;
638         export = class_conn2export(conn);
639         if (export) {
640                 struct obd_device *obd = export->exp_obd;
641                 class_export_put(export);
642                 return obd;
643         }
644         return NULL;
645 }
646
647 struct obd_import *class_exp2cliimp(struct obd_export *exp)
648 {
649         struct obd_device *obd = exp->exp_obd;
650         if (obd == NULL)
651                 return NULL;
652         return obd->u.cli.cl_import;
653 }
654
655 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
656 {
657         struct obd_device *obd = class_conn2obd(conn);
658         if (obd == NULL)
659                 return NULL;
660         return obd->u.cli.cl_import;
661 }
662
663 /* Export management functions */
664 static void export_handle_addref(void *export)
665 {
666         class_export_get(export);
667 }
668
669 struct obd_export *class_export_get(struct obd_export *exp)
670 {
671         atomic_inc(&exp->exp_refcount);
672         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
673                atomic_read(&exp->exp_refcount));
674         return exp;
675 }
676 EXPORT_SYMBOL(class_export_get);
677
678 void class_export_put(struct obd_export *exp)
679 {
680         LASSERT(exp != NULL);
681         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
682                atomic_read(&exp->exp_refcount) - 1);
683         LASSERT(atomic_read(&exp->exp_refcount) > 0);
684         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
685
686         if (atomic_dec_and_test(&exp->exp_refcount)) {
687                 LASSERT (list_empty(&exp->exp_obd_chain));
688
689                 CDEBUG(D_IOCTL, "final put %p/%s\n",
690                        exp, exp->exp_client_uuid.uuid);
691
692                 spin_lock(&obd_zombie_impexp_lock);
693                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
694                 spin_unlock(&obd_zombie_impexp_lock);
695
696                 if (obd_zombie_impexp_notify != NULL)
697                         obd_zombie_impexp_notify();
698         }
699 }
700 EXPORT_SYMBOL(class_export_put);
701
702 static void class_export_destroy(struct obd_export *exp)
703 {
704         struct obd_device *obd = exp->exp_obd;
705         ENTRY;
706
707         LASSERT (atomic_read(&exp->exp_refcount) == 0);
708
709         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
710                exp->exp_client_uuid.uuid, obd->obd_name);
711
712         LASSERT(obd != NULL);
713
714         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
715         if (exp->exp_connection)
716                 ptlrpc_put_connection_superhack(exp->exp_connection);
717
718         LASSERT(list_empty(&exp->exp_outstanding_replies));
719         LASSERT(list_empty(&exp->exp_req_replay_queue));
720         LASSERT(list_empty(&exp->exp_queued_rpc));
721         obd_destroy_export(exp);
722         class_decref(obd, "export", exp);
723
724         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
725         EXIT;
726 }
727
728 /* Creates a new export, adds it to the hash table, and returns a
729  * pointer to it. The refcount is 2: one for the hash reference, and
730  * one for the pointer returned by this function. */
731 struct obd_export *class_new_export(struct obd_device *obd,
732                                     struct obd_uuid *cluuid)
733 {
734         struct obd_export *export;
735         int rc = 0;
736
737         OBD_ALLOC_PTR(export);
738         if (!export)
739                 return ERR_PTR(-ENOMEM);
740
741         export->exp_conn_cnt = 0;
742         export->exp_lock_hash = NULL;
743         atomic_set(&export->exp_refcount, 2);
744         atomic_set(&export->exp_rpc_count, 0);
745         export->exp_obd = obd;
746         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
747         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
748         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
749         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
750         class_handle_hash(&export->exp_handle, export_handle_addref);
751         export->exp_last_request_time = cfs_time_current_sec();
752         spin_lock_init(&export->exp_lock);
753         INIT_HLIST_NODE(&export->exp_uuid_hash);
754         INIT_HLIST_NODE(&export->exp_nid_hash);
755
756         export->exp_sp_peer = LUSTRE_SP_ANY;
757         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
758         export->exp_client_uuid = *cluuid;
759         obd_init_export(export);
760
761         spin_lock(&obd->obd_dev_lock);
762         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
763                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
764                                             &export->exp_uuid_hash);
765                 if (rc != 0) {
766                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
767                                       obd->obd_name, cluuid->uuid, rc);
768                         spin_unlock(&obd->obd_dev_lock);
769                         class_handle_unhash(&export->exp_handle);
770                         OBD_FREE_PTR(export);
771                         return ERR_PTR(-EALREADY);
772                 }
773         }
774
775         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
776         class_incref(obd, "export", export);
777         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
778         list_add_tail(&export->exp_obd_chain_timed,
779                       &export->exp_obd->obd_exports_timed);
780         export->exp_obd->obd_num_exports++;
781         spin_unlock(&obd->obd_dev_lock);
782
783         return export;
784 }
785 EXPORT_SYMBOL(class_new_export);
786
787 void class_unlink_export(struct obd_export *exp)
788 {
789         class_handle_unhash(&exp->exp_handle);
790
791         spin_lock(&exp->exp_obd->obd_dev_lock);
792         /* delete an uuid-export hashitem from hashtables */
793         if (!hlist_unhashed(&exp->exp_uuid_hash))
794                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
795                                 &exp->exp_client_uuid,
796                                 &exp->exp_uuid_hash);
797
798         list_del_init(&exp->exp_obd_chain);
799         list_del_init(&exp->exp_obd_chain_timed);
800         exp->exp_obd->obd_num_exports--;
801         spin_unlock(&exp->exp_obd->obd_dev_lock);
802
803         class_export_put(exp);
804 }
805 EXPORT_SYMBOL(class_unlink_export);
806
807 /* Import management functions */
808 static void import_handle_addref(void *import)
809 {
810         class_import_get(import);
811 }
812
813 struct obd_import *class_import_get(struct obd_import *import)
814 {
815         LASSERT(atomic_read(&import->imp_refcount) >= 0);
816         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
817         atomic_inc(&import->imp_refcount);
818         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
819                atomic_read(&import->imp_refcount), 
820                import->imp_obd->obd_name);
821         return import;
822 }
823 EXPORT_SYMBOL(class_import_get);
824
825 void class_import_put(struct obd_import *import)
826 {
827         ENTRY;
828
829         LASSERT(atomic_read(&import->imp_refcount) > 0);
830         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
831         LASSERT(list_empty(&import->imp_zombie_chain));
832
833         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
834                atomic_read(&import->imp_refcount) - 1, 
835                import->imp_obd->obd_name);
836
837         if (atomic_dec_and_test(&import->imp_refcount)) {
838                 CDEBUG(D_INFO, "final put import %p\n", import);
839                 spin_lock(&obd_zombie_impexp_lock);
840                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
841                 spin_unlock(&obd_zombie_impexp_lock);
842
843                 if (obd_zombie_impexp_notify != NULL)
844                         obd_zombie_impexp_notify();
845         }
846
847         EXIT;
848 }
849 EXPORT_SYMBOL(class_import_put);
850
851 void class_import_destroy(struct obd_import *import)
852 {
853         ENTRY;
854
855         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
856                 import->imp_obd->obd_name);
857
858         LASSERT(atomic_read(&import->imp_refcount) == 0);
859
860         ptlrpc_put_connection_superhack(import->imp_connection);
861
862         while (!list_empty(&import->imp_conn_list)) {
863                 struct obd_import_conn *imp_conn;
864
865                 imp_conn = list_entry(import->imp_conn_list.next,
866                                       struct obd_import_conn, oic_item);
867                 list_del(&imp_conn->oic_item);
868                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
869                 OBD_FREE(imp_conn, sizeof(*imp_conn));
870         }
871
872         LASSERT(import->imp_sec == NULL);
873         class_decref(import->imp_obd, "import", import);
874         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
875         EXIT;
876 }
877
878 static void init_imp_at(struct imp_at *at) {
879         int i;
880         at_init(&at->iat_net_latency, 0, 0);
881         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
882                 /* max service estimates are tracked on the server side, so
883                    don't use the AT history here, just use the last reported
884                    val. (But keep hist for proc histogram, worst_ever) */
885                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
886                         AT_FLG_NOHIST);
887         }
888 }
889
890 struct obd_import *class_new_import(struct obd_device *obd)
891 {
892         struct obd_import *imp;
893
894         OBD_ALLOC(imp, sizeof(*imp));
895         if (imp == NULL)
896                 return NULL;
897
898         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
899         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
900         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
901         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
902         spin_lock_init(&imp->imp_lock);
903         imp->imp_last_success_conn = 0;
904         imp->imp_state = LUSTRE_IMP_NEW;
905         imp->imp_obd = class_incref(obd, "import", imp);
906         sema_init(&imp->imp_sec_mutex, 1);
907         cfs_waitq_init(&imp->imp_recovery_waitq);
908
909         atomic_set(&imp->imp_refcount, 2);
910         atomic_set(&imp->imp_unregistering, 0);
911         atomic_set(&imp->imp_inflight, 0);
912         atomic_set(&imp->imp_replay_inflight, 0);
913         atomic_set(&imp->imp_inval_count, 0);
914         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
915         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
916         class_handle_hash(&imp->imp_handle, import_handle_addref);
917         init_imp_at(&imp->imp_at);
918
919         /* the default magic is V2, will be used in connect RPC, and
920          * then adjusted according to the flags in request/reply. */
921         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
922
923         return imp;
924 }
925 EXPORT_SYMBOL(class_new_import);
926
927 void class_destroy_import(struct obd_import *import)
928 {
929         LASSERT(import != NULL);
930         LASSERT(import != LP_POISON);
931
932         class_handle_unhash(&import->imp_handle);
933
934         spin_lock(&import->imp_lock);
935         import->imp_generation++;
936         spin_unlock(&import->imp_lock);
937         class_import_put(import);
938 }
939 EXPORT_SYMBOL(class_destroy_import);
940
941 /* A connection defines an export context in which preallocation can
942    be managed. This releases the export pointer reference, and returns
943    the export handle, so the export refcount is 1 when this function
944    returns. */
945 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
946                   struct obd_uuid *cluuid)
947 {
948         struct obd_export *export;
949         LASSERT(conn != NULL);
950         LASSERT(obd != NULL);
951         LASSERT(cluuid != NULL);
952         ENTRY;
953
954         export = class_new_export(obd, cluuid);
955         if (IS_ERR(export))
956                 RETURN(PTR_ERR(export));
957
958         conn->cookie = export->exp_handle.h_cookie;
959         class_export_put(export);
960
961         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
962                cluuid->uuid, conn->cookie);
963         RETURN(0);
964 }
965 EXPORT_SYMBOL(class_connect);
966
967 /* if export is involved in recovery then clean up related things */
968 void class_export_recovery_cleanup(struct obd_export *exp)
969 {
970         struct obd_device *obd = exp->exp_obd;
971
972         spin_lock_bh(&obd->obd_processing_task_lock);
973         if (obd->obd_recovering && exp->exp_in_recovery) {
974                 spin_lock(&exp->exp_lock);
975                 exp->exp_in_recovery = 0;
976                 spin_unlock(&exp->exp_lock);
977                 obd->obd_connected_clients--;
978                 /* each connected client is counted as recoverable */
979                 obd->obd_recoverable_clients--;
980                 if (exp->exp_req_replay_needed) {
981                         spin_lock(&exp->exp_lock);
982                         exp->exp_req_replay_needed = 0;
983                         spin_unlock(&exp->exp_lock);
984                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
985                         atomic_dec(&obd->obd_req_replay_clients);
986                 }
987                 if (exp->exp_lock_replay_needed) {
988                         spin_lock(&exp->exp_lock);
989                         exp->exp_lock_replay_needed = 0;
990                         spin_unlock(&exp->exp_lock);
991                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
992                         atomic_dec(&obd->obd_lock_replay_clients);
993                 }
994         }
995         spin_unlock_bh(&obd->obd_processing_task_lock);
996 }
997
998 /* This function removes two references from the export: one for the
999  * hash entry and one for the export pointer passed in.  The export
1000  * pointer passed to this function is destroyed should not be used
1001  * again. */
1002 int class_disconnect(struct obd_export *export)
1003 {
1004         int already_disconnected;
1005         ENTRY;
1006
1007         if (export == NULL) {
1008                 fixme();
1009                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1010                 RETURN(-EINVAL);
1011         }
1012
1013         spin_lock(&export->exp_lock);
1014         already_disconnected = export->exp_disconnected;
1015         export->exp_disconnected = 1;
1016
1017         if (!hlist_unhashed(&export->exp_nid_hash))
1018                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1019                                 &export->exp_connection->c_peer.nid,
1020                                 &export->exp_nid_hash);
1021
1022         spin_unlock(&export->exp_lock);
1023
1024         /* class_cleanup(), abort_recovery(), and class_fail_export()
1025          * all end up in here, and if any of them race we shouldn't
1026          * call extra class_export_puts(). */
1027         if (already_disconnected)
1028                 RETURN(0);
1029
1030         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1031                export->exp_handle.h_cookie);
1032
1033         class_export_recovery_cleanup(export);
1034         class_unlink_export(export);
1035         class_export_put(export);
1036         RETURN(0);
1037 }
1038
1039 static void class_disconnect_export_list(struct list_head *list, int flags)
1040 {
1041         int rc;
1042         struct lustre_handle fake_conn;
1043         struct obd_export *fake_exp, *exp;
1044         ENTRY;
1045
1046         /* It's possible that an export may disconnect itself, but
1047          * nothing else will be added to this list. */
1048         while (!list_empty(list)) {
1049                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1050                 class_export_get(exp);
1051
1052                 spin_lock(&exp->exp_lock);
1053                 exp->exp_flags = flags;
1054                 spin_unlock(&exp->exp_lock);
1055
1056                 if (obd_uuid_equals(&exp->exp_client_uuid,
1057                                     &exp->exp_obd->obd_uuid)) {
1058                         CDEBUG(D_HA,
1059                                "exp %p export uuid == obd uuid, don't discon\n",
1060                                exp);
1061                         /* Need to delete this now so we don't end up pointing
1062                          * to work_list later when this export is cleaned up. */
1063                         list_del_init(&exp->exp_obd_chain);
1064                         class_export_put(exp);
1065                         continue;
1066                 }
1067
1068                 fake_conn.cookie = exp->exp_handle.h_cookie;
1069                 fake_exp = class_conn2export(&fake_conn);
1070                 if (!fake_exp) {
1071                         class_export_put(exp);
1072                         continue;
1073                 }
1074
1075                 spin_lock(&fake_exp->exp_lock);
1076                 fake_exp->exp_flags = flags;
1077                 spin_unlock(&fake_exp->exp_lock);
1078
1079                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1080                        "last request at "CFS_TIME_T"\n",
1081                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1082                        exp, exp->exp_last_request_time);
1083                 rc = obd_disconnect(fake_exp);
1084                 class_export_put(exp);
1085         }
1086         EXIT;
1087 }
1088
1089 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1090 {
1091         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1092                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1093 }
1094
1095 void class_disconnect_exports(struct obd_device *obd)
1096 {
1097         struct list_head work_list;
1098         ENTRY;
1099
1100         /* Move all of the exports from obd_exports to a work list, en masse. */
1101         spin_lock(&obd->obd_dev_lock);
1102         list_add(&work_list, &obd->obd_exports);
1103         list_del_init(&obd->obd_exports);
1104         spin_unlock(&obd->obd_dev_lock);
1105
1106         if (!list_empty(&work_list)) {
1107                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1108                        "disconnecting them\n", obd->obd_minor, obd);
1109                 class_disconnect_export_list(&work_list,
1110                                              get_exp_flags_from_obd(obd));
1111         } else
1112                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1113                        obd->obd_minor, obd);
1114         EXIT;
1115 }
1116 EXPORT_SYMBOL(class_disconnect_exports);
1117
1118 /* Remove exports that have not completed recovery.
1119  */
1120 int class_disconnect_stale_exports(struct obd_device *obd,
1121                                    int (*test_export)(struct obd_export *))
1122 {
1123         struct list_head work_list;
1124         struct list_head *pos, *n;
1125         struct obd_export *exp;
1126         int cnt = 0;
1127         ENTRY;
1128
1129         CFS_INIT_LIST_HEAD(&work_list);
1130         spin_lock(&obd->obd_dev_lock);
1131         list_for_each_safe(pos, n, &obd->obd_exports) {
1132                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1133                 if (test_export(exp))
1134                         continue;
1135
1136                 list_del(&exp->exp_obd_chain);
1137                 list_add(&exp->exp_obd_chain, &work_list);
1138                 /* don't count self-export as client */
1139                 if (obd_uuid_equals(&exp->exp_client_uuid,
1140                                      &exp->exp_obd->obd_uuid))
1141                         continue;
1142
1143                 cnt++;
1144                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1145                        obd->obd_name, exp->exp_client_uuid.uuid,
1146                        exp->exp_connection == NULL ? "<unknown>" :
1147                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1148         }
1149         spin_unlock(&obd->obd_dev_lock);
1150
1151         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1152                obd->obd_name, cnt);
1153         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1154         RETURN(cnt);
1155 }
1156 EXPORT_SYMBOL(class_disconnect_stale_exports);
1157
1158 void class_fail_export(struct obd_export *exp)
1159 {
1160         int rc, already_failed;
1161
1162         spin_lock(&exp->exp_lock);
1163         already_failed = exp->exp_failed;
1164         exp->exp_failed = 1;
1165         spin_unlock(&exp->exp_lock);
1166
1167         if (already_failed) {
1168                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1169                        exp, exp->exp_client_uuid.uuid);
1170                 return;
1171         }
1172
1173         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1174                exp, exp->exp_client_uuid.uuid);
1175
1176         if (obd_dump_on_timeout)
1177                 libcfs_debug_dumplog();
1178
1179         /* Most callers into obd_disconnect are removing their own reference
1180          * (request, for example) in addition to the one from the hash table.
1181          * We don't have such a reference here, so make one. */
1182         class_export_get(exp);
1183         rc = obd_disconnect(exp);
1184         if (rc)
1185                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1186         else
1187                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1188                        exp, exp->exp_client_uuid.uuid);
1189 }
1190 EXPORT_SYMBOL(class_fail_export);
1191
1192 char *obd_export_nid2str(struct obd_export *exp)
1193 {
1194         if (exp->exp_connection != NULL)
1195                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1196
1197         return "(no nid)";
1198 }
1199 EXPORT_SYMBOL(obd_export_nid2str);
1200
1201 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1202 {
1203         struct obd_export *doomed_exp = NULL;
1204         int exports_evicted = 0;
1205
1206         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1207
1208         do {
1209                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1210                 if (doomed_exp == NULL)
1211                         break;
1212
1213                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1214                          "nid %s found, wanted nid %s, requested nid %s\n",
1215                          obd_export_nid2str(doomed_exp),
1216                          libcfs_nid2str(nid_key), nid);
1217                 LASSERTF(doomed_exp != obd->obd_self_export,
1218                          "self-export is hashed by NID?\n");
1219                 exports_evicted++;
1220                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1221                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1222                        exports_evicted);
1223                 class_fail_export(doomed_exp);
1224                 class_export_put(doomed_exp);
1225         } while (1);
1226
1227         if (!exports_evicted)
1228                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1229                        obd->obd_name, nid);
1230         return exports_evicted;
1231 }
1232 EXPORT_SYMBOL(obd_export_evict_by_nid);
1233
1234 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1235 {
1236         struct obd_export *doomed_exp = NULL;
1237         struct obd_uuid doomed_uuid;
1238         int exports_evicted = 0;
1239
1240         obd_str2uuid(&doomed_uuid, uuid);
1241         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1242                 CERROR("%s: can't evict myself\n", obd->obd_name);
1243                 return exports_evicted;
1244         }
1245
1246         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1247
1248         if (doomed_exp == NULL) {
1249                 CERROR("%s: can't disconnect %s: no exports found\n",
1250                        obd->obd_name, uuid);
1251         } else {
1252                 CWARN("%s: evicting %s at adminstrative request\n",
1253                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1254                 class_fail_export(doomed_exp);
1255                 class_export_put(doomed_exp);
1256                 exports_evicted++;
1257         }
1258
1259         return exports_evicted;
1260 }
1261 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1262
1263 /**
1264  * kill zombie imports and exports
1265  */
1266 void obd_zombie_impexp_cull(void)
1267 {
1268         struct obd_import *import;
1269         struct obd_export *export;
1270         ENTRY;
1271
1272         do {
1273                 spin_lock (&obd_zombie_impexp_lock);
1274
1275                 import = NULL;
1276                 if (!list_empty(&obd_zombie_imports)) {
1277                         import = list_entry(obd_zombie_imports.next,
1278                                             struct obd_import,
1279                                             imp_zombie_chain);
1280                         list_del(&import->imp_zombie_chain);
1281                 }
1282
1283                 export = NULL;
1284                 if (!list_empty(&obd_zombie_exports)) {
1285                         export = list_entry(obd_zombie_exports.next,
1286                                             struct obd_export,
1287                                             exp_obd_chain);
1288                         list_del_init(&export->exp_obd_chain);
1289                 }
1290
1291                 spin_unlock(&obd_zombie_impexp_lock);
1292
1293                 if (import != NULL)
1294                         class_import_destroy(import);
1295
1296                 if (export != NULL)
1297                         class_export_destroy(export);
1298
1299         } while (import != NULL || export != NULL);
1300         EXIT;
1301 }
1302
1303 static struct completion        obd_zombie_start;
1304 static struct completion        obd_zombie_stop;
1305 static unsigned long            obd_zombie_flags;
1306 static cfs_waitq_t              obd_zombie_waitq;
1307
1308 enum {
1309         OBD_ZOMBIE_STOP = 1
1310 };
1311
1312 /**
1313  * check for work for kill zombie import/export thread.
1314  */
1315 static int obd_zombie_impexp_check(void *arg)
1316 {
1317         int rc;
1318
1319         spin_lock(&obd_zombie_impexp_lock);
1320         rc = list_empty(&obd_zombie_imports) &&
1321              list_empty(&obd_zombie_exports) &&
1322              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1323
1324         spin_unlock(&obd_zombie_impexp_lock);
1325
1326         RETURN(rc);
1327 }
1328
1329 /**
1330  * notify import/export destroy thread about new zombie.
1331  */
1332 static void obd_zombie_impexp_notify(void)
1333 {
1334         cfs_waitq_signal(&obd_zombie_waitq);
1335 }
1336
1337 /**
1338  * check whether obd_zombie is idle
1339  */
1340 static int obd_zombie_is_idle(void)
1341 {
1342         int rc;
1343
1344         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1345         spin_lock(&obd_zombie_impexp_lock);
1346         rc = list_empty(&obd_zombie_imports) &&
1347              list_empty(&obd_zombie_exports);
1348         spin_unlock(&obd_zombie_impexp_lock);
1349         return rc;
1350 }
1351
1352 /**
1353  * wait when obd_zombie import/export queues become empty
1354  */
1355 void obd_zombie_barrier(void)
1356 {
1357         struct l_wait_info lwi = { 0 };
1358
1359         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1360 }
1361 EXPORT_SYMBOL(obd_zombie_barrier);
1362
1363 #ifdef __KERNEL__
1364
1365 /**
1366  * destroy zombie export/import thread.
1367  */
1368 static int obd_zombie_impexp_thread(void *unused)
1369 {
1370         int rc;
1371
1372         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1373                 complete(&obd_zombie_start);
1374                 RETURN(rc);
1375         }
1376
1377         complete(&obd_zombie_start);
1378
1379         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1380                 struct l_wait_info lwi = { 0 };
1381
1382                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1383
1384                 obd_zombie_impexp_cull();
1385                 /* Notify obd_zombie_barrier callers that queues may be empty */
1386                 cfs_waitq_signal(&obd_zombie_waitq);
1387         }
1388
1389         complete(&obd_zombie_stop);
1390
1391         RETURN(0);
1392 }
1393
1394 #else /* ! KERNEL */
1395
1396 static atomic_t zombie_recur = ATOMIC_INIT(0);
1397 static void *obd_zombie_impexp_work_cb;
1398 static void *obd_zombie_impexp_idle_cb;
1399
1400 int obd_zombie_impexp_kill(void *arg)
1401 {
1402         int rc = 0;
1403
1404         if (atomic_inc_return(&zombie_recur) == 1) {
1405                 obd_zombie_impexp_cull();
1406                 rc = 1;
1407         }
1408         atomic_dec(&zombie_recur);
1409         return rc;
1410 }
1411
1412 #endif
1413
1414 /**
1415  * start destroy zombie import/export thread
1416  */
1417 int obd_zombie_impexp_init(void)
1418 {
1419         int rc;
1420
1421         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1422         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1423         spin_lock_init(&obd_zombie_impexp_lock);
1424         init_completion(&obd_zombie_start);
1425         init_completion(&obd_zombie_stop);
1426         cfs_waitq_init(&obd_zombie_waitq);
1427
1428 #ifdef __KERNEL__
1429         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1430         if (rc < 0)
1431                 RETURN(rc);
1432
1433         wait_for_completion(&obd_zombie_start);
1434 #else
1435
1436         obd_zombie_impexp_work_cb =
1437                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1438                                                  &obd_zombie_impexp_kill, NULL);
1439
1440         obd_zombie_impexp_idle_cb =
1441                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1442                                                  &obd_zombie_impexp_check, NULL);
1443         rc = 0;
1444
1445 #endif
1446         RETURN(rc);
1447 }
1448 /**
1449  * stop destroy zombie import/export thread
1450  */
1451 void obd_zombie_impexp_stop(void)
1452 {
1453         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1454         obd_zombie_impexp_notify();
1455 #ifdef __KERNEL__
1456         wait_for_completion(&obd_zombie_stop);
1457 #else
1458         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1459         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1460 #endif
1461 }