Whamcloud - gitweb
615b2f9dda2a64ebc56de3a8841e15d732893b6a
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82 EXPORT_SYMBOL(obd_device_alloc);
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef CONFIG_KMOD
120         if (!type) {
121                 const char *modname = name;
122                 if (!request_module(modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 try_module_get(type->typ_dt_ops->o_owner);
135                 spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139
140 void class_put_type(struct obd_type *type)
141 {
142         LASSERT(type);
143         spin_lock(&type->obd_type_lock);
144         type->typ_refcnt--;
145         module_put(type->typ_dt_ops->o_owner);
146         spin_unlock(&type->obd_type_lock);
147 }
148
149 #define CLASS_MAX_NAME 1024
150
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152                         struct lprocfs_vars *vars, const char *name,
153                         struct lu_device_type *ldt)
154 {
155         struct obd_type *type;
156         int rc = 0;
157         ENTRY;
158
159         /* sanity check */
160         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
161
162         if (class_search_type(name)) {
163                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
164                 RETURN(-EEXIST);
165         }
166
167         rc = -ENOMEM;
168         OBD_ALLOC(type, sizeof(*type));
169         if (type == NULL)
170                 RETURN(rc);
171
172         OBD_ALLOC_PTR(type->typ_dt_ops);
173         OBD_ALLOC_PTR(type->typ_md_ops);
174         OBD_ALLOC(type->typ_name, strlen(name) + 1);
175
176         if (type->typ_dt_ops == NULL ||
177             type->typ_md_ops == NULL ||
178             type->typ_name == NULL)
179                 GOTO (failed, rc);
180
181         *(type->typ_dt_ops) = *dt_ops;
182         /* md_ops is optional */
183         if (md_ops)
184                 *(type->typ_md_ops) = *md_ops;
185         strcpy(type->typ_name, name);
186         spin_lock_init(&type->obd_type_lock);
187
188 #ifdef LPROCFS
189         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
190                                               vars, type);
191         if (IS_ERR(type->typ_procroot)) {
192                 rc = PTR_ERR(type->typ_procroot);
193                 type->typ_procroot = NULL;
194                 GOTO (failed, rc);
195         }
196 #endif
197         if (ldt != NULL) {
198                 type->typ_lu = ldt;
199                 rc = lu_device_type_init(ldt);
200                 if (rc != 0)
201                         GOTO (failed, rc);
202         }
203
204         spin_lock(&obd_types_lock);
205         list_add(&type->typ_chain, &obd_types);
206         spin_unlock(&obd_types_lock);
207
208         RETURN (0);
209
210  failed:
211         if (type->typ_name != NULL)
212                 OBD_FREE(type->typ_name, strlen(name) + 1);
213         if (type->typ_md_ops != NULL)
214                 OBD_FREE_PTR(type->typ_md_ops);
215         if (type->typ_dt_ops != NULL)
216                 OBD_FREE_PTR(type->typ_dt_ops);
217         OBD_FREE(type, sizeof(*type));
218         RETURN(rc);
219 }
220
221 int class_unregister_type(const char *name)
222 {
223         struct obd_type *type = class_search_type(name);
224         ENTRY;
225
226         if (!type) {
227                 CERROR("unknown obd type\n");
228                 RETURN(-EINVAL);
229         }
230
231         if (type->typ_refcnt) {
232                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233                 /* This is a bad situation, let's make the best of it */
234                 /* Remove ops, but leave the name for debugging */
235                 OBD_FREE_PTR(type->typ_dt_ops);
236                 OBD_FREE_PTR(type->typ_md_ops);
237                 RETURN(-EBUSY);
238         }
239
240         if (type->typ_procroot) {
241                 lprocfs_remove(&type->typ_procroot);
242         }
243
244         if (type->typ_lu)
245                 lu_device_type_fini(type->typ_lu);
246
247         spin_lock(&obd_types_lock);
248         list_del(&type->typ_chain);
249         spin_unlock(&obd_types_lock);
250         OBD_FREE(type->typ_name, strlen(name) + 1);
251         if (type->typ_dt_ops != NULL)
252                 OBD_FREE_PTR(type->typ_dt_ops);
253         if (type->typ_md_ops != NULL)
254                 OBD_FREE_PTR(type->typ_md_ops);
255         OBD_FREE(type, sizeof(*type));
256         RETURN(0);
257 } /* class_unregister_type */
258
259 /**
260  * Create a new obd device.
261  *
262  * Find an empty slot in ::obd_devs[], create a new obd device in it.
263  *
264  * \param typename [in] obd device type string.
265  * \param name     [in] obd device name.
266  *
267  * \retval NULL if create fails, otherwise return the obd device
268  *         pointer created.
269  */
270 struct obd_device *class_newdev(const char *type_name, const char *name)
271 {
272         struct obd_device *result = NULL;
273         struct obd_device *newdev;
274         struct obd_type *type = NULL;
275         int i;
276         int new_obd_minor = 0;
277
278         if (strlen(name) >= MAX_OBD_NAME) {
279                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280                 RETURN(ERR_PTR(-EINVAL));
281         }
282
283         type = class_get_type(type_name);
284         if (type == NULL){
285                 CERROR("OBD: unknown type: %s\n", type_name);
286                 RETURN(ERR_PTR(-ENODEV));
287         }
288
289         newdev = obd_device_alloc();
290         if (newdev == NULL) {
291                 class_put_type(type);
292                 RETURN(ERR_PTR(-ENOMEM));
293         }
294         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
295
296         spin_lock(&obd_dev_lock);
297         for (i = 0; i < class_devno_max(); i++) {
298                 struct obd_device *obd = class_num2obd(i);
299                 if (obd && obd->obd_name &&
300                     (strcmp(name, obd->obd_name) == 0)) {
301                         CERROR("Device %s already exists, won't add\n", name);
302                         if (result) {
303                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304                                          "%p obd_magic %08x != %08x\n", result,
305                                          result->obd_magic, OBD_DEVICE_MAGIC);
306                                 LASSERTF(result->obd_minor == new_obd_minor,
307                                          "%p obd_minor %d != %d\n", result,
308                                          result->obd_minor, new_obd_minor);
309
310                                 obd_devs[result->obd_minor] = NULL;
311                                 result->obd_name[0]='\0';
312                          }
313                         result = ERR_PTR(-EEXIST);
314                         break;
315                 }
316                 if (!result && !obd) {
317                         result = newdev;
318                         result->obd_minor = i;
319                         new_obd_minor = i;
320                         result->obd_type = type;
321                         strncpy(result->obd_name, name,
322                                 sizeof(result->obd_name) - 1);
323                         obd_devs[i] = result;
324                 }
325         }
326         spin_unlock(&obd_dev_lock);
327
328         if (result == NULL && i >= class_devno_max()) {
329                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
330                        class_devno_max());
331                 result = ERR_PTR(-EOVERFLOW);
332         }
333
334         if (IS_ERR(result)) {
335                 obd_device_free(newdev);
336                 class_put_type(type);
337         } else {
338                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339                        result->obd_name, result);
340         }
341         return result;
342 }
343
344 void class_release_dev(struct obd_device *obd)
345 {
346         struct obd_type *obd_type = obd->obd_type;
347
348         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352         LASSERT(obd_type != NULL);
353
354         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355                obd->obd_name,obd->obd_type->typ_name);
356
357         spin_lock(&obd_dev_lock);
358         obd_devs[obd->obd_minor] = NULL;
359         spin_unlock(&obd_dev_lock);
360         obd_device_free(obd);
361
362         class_put_type(obd_type);
363 }
364
365 int class_name2dev(const char *name)
366 {
367         int i;
368
369         if (!name)
370                 return -1;
371
372         spin_lock(&obd_dev_lock);
373         for (i = 0; i < class_devno_max(); i++) {
374                 struct obd_device *obd = class_num2obd(i);
375                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376                         /* Make sure we finished attaching before we give
377                            out any references */
378                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379                         if (obd->obd_attached) {
380                                 spin_unlock(&obd_dev_lock);
381                                 return i;
382                         }
383                         break;
384                 }
385         }
386         spin_unlock(&obd_dev_lock);
387
388         return -1;
389 }
390
391 struct obd_device *class_name2obd(const char *name)
392 {
393         int dev = class_name2dev(name);
394
395         if (dev < 0 || dev > class_devno_max())
396                 return NULL;
397         return class_num2obd(dev);
398 }
399
400 int class_uuid2dev(struct obd_uuid *uuid)
401 {
402         int i;
403
404         spin_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409                         spin_unlock(&obd_dev_lock);
410                         return i;
411                 }
412         }
413         spin_unlock(&obd_dev_lock);
414
415         return -1;
416 }
417
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
419 {
420         int dev = class_uuid2dev(uuid);
421         if (dev < 0)
422                 return NULL;
423         return class_num2obd(dev);
424 }
425
426 /**
427  * Get obd device from ::obd_devs[]
428  *
429  * \param num [in] array index
430  *
431  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432  *         otherwise return the obd device there.
433  */
434 struct obd_device *class_num2obd(int num)
435 {
436         struct obd_device *obd = NULL;
437
438         if (num < class_devno_max()) {
439                 obd = obd_devs[num];
440                 if (obd == NULL)
441                         return NULL;
442
443                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444                          "%p obd_magic %08x != %08x\n",
445                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446                 LASSERTF(obd->obd_minor == num,
447                          "%p obd_minor %0d != %0d\n",
448                          obd, obd->obd_minor, num);
449         }
450
451         return obd;
452 }
453
454 void class_obd_list(void)
455 {
456         char *status;
457         int i;
458
459         spin_lock(&obd_dev_lock);
460         for (i = 0; i < class_devno_max(); i++) {
461                 struct obd_device *obd = class_num2obd(i);
462                 if (obd == NULL)
463                         continue;
464                 if (obd->obd_stopping)
465                         status = "ST";
466                 else if (obd->obd_set_up)
467                         status = "UP";
468                 else if (obd->obd_attached)
469                         status = "AT";
470                 else
471                         status = "--";
472                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473                          i, status, obd->obd_type->typ_name,
474                          obd->obd_name, obd->obd_uuid.uuid,
475                          atomic_read(&obd->obd_refcount));
476         }
477         spin_unlock(&obd_dev_lock);
478         return;
479 }
480
481 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
482    specified, then only the client with that uuid is returned,
483    otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485                                           const char * typ_name,
486                                           struct obd_uuid *grp_uuid)
487 {
488         int i;
489
490         spin_lock(&obd_dev_lock);
491         for (i = 0; i < class_devno_max(); i++) {
492                 struct obd_device *obd = class_num2obd(i);
493                 if (obd == NULL)
494                         continue;
495                 if ((strncmp(obd->obd_type->typ_name, typ_name,
496                              strlen(typ_name)) == 0)) {
497                         if (obd_uuid_equals(tgt_uuid,
498                                             &obd->u.cli.cl_target_uuid) &&
499                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
500                                                          &obd->obd_uuid) : 1)) {
501                                 spin_unlock(&obd_dev_lock);
502                                 return obd;
503                         }
504                 }
505         }
506         spin_unlock(&obd_dev_lock);
507
508         return NULL;
509 }
510
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512    searching at *next, and if a device is found, the next index to look
513    at is saved in *next. If next is NULL, then the first matching device
514    will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
516 {
517         int i;
518
519         if (next == NULL)
520                 i = 0;
521         else if (*next >= 0 && *next < class_devno_max())
522                 i = *next;
523         else
524                 return NULL;
525
526         spin_lock(&obd_dev_lock);
527         for (; i < class_devno_max(); i++) {
528                 struct obd_device *obd = class_num2obd(i);
529                 if (obd == NULL)
530                         continue;
531                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532                         if (next != NULL)
533                                 *next = i+1;
534                         spin_unlock(&obd_dev_lock);
535                         return obd;
536                 }
537         }
538         spin_unlock(&obd_dev_lock);
539
540         return NULL;
541 }
542
543 /**
544  * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545  * adjust sptlrpc settings accordingly.
546  */
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
548 {
549         struct obd_device  *obd;
550         const char         *type;
551         int                 i, rc = 0, rc2;
552
553         LASSERT(namelen > 0);
554
555         spin_lock(&obd_dev_lock);
556         for (i = 0; i < class_devno_max(); i++) {
557                 obd = class_num2obd(i);
558
559                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
560                         continue;
561
562                 /* only notify mdc, osc, mdt, ost */
563                 type = obd->obd_type->typ_name;
564                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567                     strcmp(type, LUSTRE_OST_NAME) != 0)
568                         continue;
569
570                 if (strncmp(obd->obd_name, fsname, namelen))
571                         continue;
572
573                 class_incref(obd, __FUNCTION__, obd);
574                 spin_unlock(&obd_dev_lock);
575                 rc2 = obd_set_info_async(obd->obd_self_export,
576                                          sizeof(KEY_SPTLRPC_CONF),
577                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
578                 rc = rc ? rc : rc2;
579                 class_decref(obd, __FUNCTION__, obd);
580                 spin_lock(&obd_dev_lock);
581         }
582         spin_unlock(&obd_dev_lock);
583         return rc;
584 }
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
586
587 void obd_cleanup_caches(void)
588 {
589         int rc;
590
591         ENTRY;
592         if (obd_device_cachep) {
593                 rc = cfs_mem_cache_destroy(obd_device_cachep);
594                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595                 obd_device_cachep = NULL;
596         }
597         if (obdo_cachep) {
598                 rc = cfs_mem_cache_destroy(obdo_cachep);
599                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
600                 obdo_cachep = NULL;
601         }
602         if (import_cachep) {
603                 rc = cfs_mem_cache_destroy(import_cachep);
604                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605                 import_cachep = NULL;
606         }
607         if (capa_cachep) {
608                 rc = cfs_mem_cache_destroy(capa_cachep);
609                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
610                 capa_cachep = NULL;
611         }
612         EXIT;
613 }
614
615 int obd_init_caches(void)
616 {
617         ENTRY;
618
619         LASSERT(obd_device_cachep == NULL);
620         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621                                                  sizeof(struct obd_device),
622                                                  0, 0);
623         if (!obd_device_cachep)
624                 GOTO(out, -ENOMEM);
625
626         LASSERT(obdo_cachep == NULL);
627         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
628                                            0, 0);
629         if (!obdo_cachep)
630                 GOTO(out, -ENOMEM);
631
632         LASSERT(import_cachep == NULL);
633         import_cachep = cfs_mem_cache_create("ll_import_cache",
634                                              sizeof(struct obd_import),
635                                              0, 0);
636         if (!import_cachep)
637                 GOTO(out, -ENOMEM);
638
639         LASSERT(capa_cachep == NULL);
640         capa_cachep = cfs_mem_cache_create("capa_cache",
641                                            sizeof(struct obd_capa), 0, 0);
642         if (!capa_cachep)
643                 GOTO(out, -ENOMEM);
644
645         RETURN(0);
646  out:
647         obd_cleanup_caches();
648         RETURN(-ENOMEM);
649
650 }
651
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
654 {
655         struct obd_export *export;
656         ENTRY;
657
658         if (!conn) {
659                 CDEBUG(D_CACHE, "looking for null handle\n");
660                 RETURN(NULL);
661         }
662
663         if (conn->cookie == -1) {  /* this means assign a new connection */
664                 CDEBUG(D_CACHE, "want a new connection\n");
665                 RETURN(NULL);
666         }
667
668         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669         export = class_handle2object(conn->cookie);
670         RETURN(export);
671 }
672
673 struct obd_device *class_exp2obd(struct obd_export *exp)
674 {
675         if (exp)
676                 return exp->exp_obd;
677         return NULL;
678 }
679
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
681 {
682         struct obd_export *export;
683         export = class_conn2export(conn);
684         if (export) {
685                 struct obd_device *obd = export->exp_obd;
686                 class_export_put(export);
687                 return obd;
688         }
689         return NULL;
690 }
691
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
693 {
694         struct obd_device *obd = exp->exp_obd;
695         if (obd == NULL)
696                 return NULL;
697         return obd->u.cli.cl_import;
698 }
699
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
701 {
702         struct obd_device *obd = class_conn2obd(conn);
703         if (obd == NULL)
704                 return NULL;
705         return obd->u.cli.cl_import;
706 }
707
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
710 {
711         struct obd_device *obd = exp->exp_obd;
712         ENTRY;
713
714         LASSERT (atomic_read(&exp->exp_refcount) == 0);
715
716         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717                exp->exp_client_uuid.uuid, obd->obd_name);
718
719         LASSERT(obd != NULL);
720
721         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722         if (exp->exp_connection)
723                 ptlrpc_put_connection_superhack(exp->exp_connection);
724
725         LASSERT(list_empty(&exp->exp_outstanding_replies));
726         LASSERT(list_empty(&exp->exp_req_replay_queue));
727         LASSERT(list_empty(&exp->exp_queued_rpc));
728         obd_destroy_export(exp);
729         class_decref(obd, "export", exp);
730
731         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
732         EXIT;
733 }
734
735 static void export_handle_addref(void *export)
736 {
737         class_export_get(export);
738 }
739
740 struct obd_export *class_export_get(struct obd_export *exp)
741 {
742         atomic_inc(&exp->exp_refcount);
743         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
744                atomic_read(&exp->exp_refcount));
745         return exp;
746 }
747 EXPORT_SYMBOL(class_export_get);
748
749 void class_export_put(struct obd_export *exp)
750 {
751         LASSERT(exp != NULL);
752         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
753                atomic_read(&exp->exp_refcount) - 1);
754         LASSERT(atomic_read(&exp->exp_refcount) > 0);
755         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
756
757         if (atomic_dec_and_test(&exp->exp_refcount)) {
758                 CDEBUG(D_IOCTL, "final put %p/%s\n",
759                        exp, exp->exp_client_uuid.uuid);
760                 obd_zombie_export_add(exp);
761         }
762 }
763 EXPORT_SYMBOL(class_export_put);
764
765 /* Creates a new export, adds it to the hash table, and returns a
766  * pointer to it. The refcount is 2: one for the hash reference, and
767  * one for the pointer returned by this function. */
768 struct obd_export *class_new_export(struct obd_device *obd,
769                                     struct obd_uuid *cluuid)
770 {
771         struct obd_export *export;
772         int rc = 0;
773
774         OBD_ALLOC_PTR(export);
775         if (!export)
776                 return ERR_PTR(-ENOMEM);
777
778         export->exp_conn_cnt = 0;
779         export->exp_lock_hash = NULL;
780         atomic_set(&export->exp_refcount, 2);
781         atomic_set(&export->exp_rpc_count, 0);
782         export->exp_obd = obd;
783         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
784         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
785         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
786         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
787         class_handle_hash(&export->exp_handle, export_handle_addref);
788         export->exp_last_request_time = cfs_time_current_sec();
789         spin_lock_init(&export->exp_lock);
790         INIT_HLIST_NODE(&export->exp_uuid_hash);
791         INIT_HLIST_NODE(&export->exp_nid_hash);
792
793         export->exp_sp_peer = LUSTRE_SP_ANY;
794         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
795         export->exp_client_uuid = *cluuid;
796         obd_init_export(export);
797
798         spin_lock(&obd->obd_dev_lock);
799         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
800                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
801                                             &export->exp_uuid_hash);
802                 if (rc != 0) {
803                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
804                                       obd->obd_name, cluuid->uuid, rc);
805                         spin_unlock(&obd->obd_dev_lock);
806                         class_handle_unhash(&export->exp_handle);
807                         OBD_FREE_PTR(export);
808                         return ERR_PTR(-EALREADY);
809                 }
810         }
811
812         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
813         class_incref(obd, "export", export);
814         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
815         list_add_tail(&export->exp_obd_chain_timed,
816                       &export->exp_obd->obd_exports_timed);
817         export->exp_obd->obd_num_exports++;
818         spin_unlock(&obd->obd_dev_lock);
819
820         return export;
821 }
822 EXPORT_SYMBOL(class_new_export);
823
824 void class_unlink_export(struct obd_export *exp)
825 {
826         class_handle_unhash(&exp->exp_handle);
827
828         spin_lock(&exp->exp_obd->obd_dev_lock);
829         /* delete an uuid-export hashitem from hashtables */
830         if (!hlist_unhashed(&exp->exp_uuid_hash))
831                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
832                                 &exp->exp_client_uuid,
833                                 &exp->exp_uuid_hash);
834
835         list_del_init(&exp->exp_obd_chain);
836         list_del_init(&exp->exp_obd_chain_timed);
837         exp->exp_obd->obd_num_exports--;
838         spin_unlock(&exp->exp_obd->obd_dev_lock);
839
840         class_export_put(exp);
841 }
842 EXPORT_SYMBOL(class_unlink_export);
843
844 /* Import management functions */
845 void class_import_destroy(struct obd_import *imp)
846 {
847         ENTRY;
848
849         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
850                 imp->imp_obd->obd_name);
851
852         LASSERT(atomic_read(&imp->imp_refcount) == 0);
853
854         ptlrpc_put_connection_superhack(imp->imp_connection);
855
856         while (!list_empty(&imp->imp_conn_list)) {
857                 struct obd_import_conn *imp_conn;
858
859                 imp_conn = list_entry(imp->imp_conn_list.next,
860                                       struct obd_import_conn, oic_item);
861                 list_del_init(&imp_conn->oic_item);
862                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
863                 OBD_FREE(imp_conn, sizeof(*imp_conn));
864         }
865
866         LASSERT(imp->imp_sec == NULL);
867         class_decref(imp->imp_obd, "import", imp);
868         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
869         EXIT;
870 }
871
872 static void import_handle_addref(void *import)
873 {
874         class_import_get(import);
875 }
876
877 struct obd_import *class_import_get(struct obd_import *import)
878 {
879         LASSERT(atomic_read(&import->imp_refcount) >= 0);
880         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
881         atomic_inc(&import->imp_refcount);
882         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
883                atomic_read(&import->imp_refcount), 
884                import->imp_obd->obd_name);
885         return import;
886 }
887 EXPORT_SYMBOL(class_import_get);
888
889 void class_import_put(struct obd_import *imp)
890 {
891         ENTRY;
892
893         LASSERT(atomic_read(&imp->imp_refcount) > 0);
894         LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
895         LASSERT(list_empty(&imp->imp_zombie_chain));
896
897         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
898                atomic_read(&imp->imp_refcount) - 1, 
899                imp->imp_obd->obd_name);
900
901         if (atomic_dec_and_test(&imp->imp_refcount)) {
902                 CDEBUG(D_INFO, "final put import %p\n", imp);
903                 obd_zombie_import_add(imp);
904         }
905
906         EXIT;
907 }
908 EXPORT_SYMBOL(class_import_put);
909
910 static void init_imp_at(struct imp_at *at) {
911         int i;
912         at_init(&at->iat_net_latency, 0, 0);
913         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
914                 /* max service estimates are tracked on the server side, so
915                    don't use the AT history here, just use the last reported
916                    val. (But keep hist for proc histogram, worst_ever) */
917                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
918                         AT_FLG_NOHIST);
919         }
920 }
921
922 struct obd_import *class_new_import(struct obd_device *obd)
923 {
924         struct obd_import *imp;
925
926         OBD_ALLOC(imp, sizeof(*imp));
927         if (imp == NULL)
928                 return NULL;
929
930         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
931         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
932         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
933         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
934         spin_lock_init(&imp->imp_lock);
935         imp->imp_last_success_conn = 0;
936         imp->imp_state = LUSTRE_IMP_NEW;
937         imp->imp_obd = class_incref(obd, "import", imp);
938         sema_init(&imp->imp_sec_mutex, 1);
939         cfs_waitq_init(&imp->imp_recovery_waitq);
940
941         atomic_set(&imp->imp_refcount, 2);
942         atomic_set(&imp->imp_unregistering, 0);
943         atomic_set(&imp->imp_inflight, 0);
944         atomic_set(&imp->imp_replay_inflight, 0);
945         atomic_set(&imp->imp_inval_count, 0);
946         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
947         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
948         class_handle_hash(&imp->imp_handle, import_handle_addref);
949         init_imp_at(&imp->imp_at);
950
951         /* the default magic is V2, will be used in connect RPC, and
952          * then adjusted according to the flags in request/reply. */
953         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
954
955         return imp;
956 }
957 EXPORT_SYMBOL(class_new_import);
958
959 void class_destroy_import(struct obd_import *import)
960 {
961         LASSERT(import != NULL);
962         LASSERT(import != LP_POISON);
963
964         class_handle_unhash(&import->imp_handle);
965
966         spin_lock(&import->imp_lock);
967         import->imp_generation++;
968         spin_unlock(&import->imp_lock);
969         class_import_put(import);
970 }
971 EXPORT_SYMBOL(class_destroy_import);
972
973 /* A connection defines an export context in which preallocation can
974    be managed. This releases the export pointer reference, and returns
975    the export handle, so the export refcount is 1 when this function
976    returns. */
977 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
978                   struct obd_uuid *cluuid)
979 {
980         struct obd_export *export;
981         LASSERT(conn != NULL);
982         LASSERT(obd != NULL);
983         LASSERT(cluuid != NULL);
984         ENTRY;
985
986         export = class_new_export(obd, cluuid);
987         if (IS_ERR(export))
988                 RETURN(PTR_ERR(export));
989
990         conn->cookie = export->exp_handle.h_cookie;
991         class_export_put(export);
992
993         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
994                cluuid->uuid, conn->cookie);
995         RETURN(0);
996 }
997 EXPORT_SYMBOL(class_connect);
998
999 /* if export is involved in recovery then clean up related things */
1000 void class_export_recovery_cleanup(struct obd_export *exp)
1001 {
1002         struct obd_device *obd = exp->exp_obd;
1003
1004         spin_lock_bh(&obd->obd_processing_task_lock);
1005         if (obd->obd_recovering && exp->exp_in_recovery) {
1006                 spin_lock(&exp->exp_lock);
1007                 exp->exp_in_recovery = 0;
1008                 spin_unlock(&exp->exp_lock);
1009                 obd->obd_connected_clients--;
1010                 /* each connected client is counted as recoverable */
1011                 obd->obd_recoverable_clients--;
1012                 if (exp->exp_req_replay_needed) {
1013                         spin_lock(&exp->exp_lock);
1014                         exp->exp_req_replay_needed = 0;
1015                         spin_unlock(&exp->exp_lock);
1016                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
1017                         atomic_dec(&obd->obd_req_replay_clients);
1018                 }
1019                 if (exp->exp_lock_replay_needed) {
1020                         spin_lock(&exp->exp_lock);
1021                         exp->exp_lock_replay_needed = 0;
1022                         spin_unlock(&exp->exp_lock);
1023                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1024                         atomic_dec(&obd->obd_lock_replay_clients);
1025                 }
1026         }
1027         spin_unlock_bh(&obd->obd_processing_task_lock);
1028 }
1029
1030 /* This function removes two references from the export: one for the
1031  * hash entry and one for the export pointer passed in.  The export
1032  * pointer passed to this function is destroyed should not be used
1033  * again. */
1034 int class_disconnect(struct obd_export *export)
1035 {
1036         int already_disconnected;
1037         ENTRY;
1038
1039         if (export == NULL) {
1040                 fixme();
1041                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1042                 RETURN(-EINVAL);
1043         }
1044
1045         spin_lock(&export->exp_lock);
1046         already_disconnected = export->exp_disconnected;
1047         export->exp_disconnected = 1;
1048
1049         if (!hlist_unhashed(&export->exp_nid_hash))
1050                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1051                                 &export->exp_connection->c_peer.nid,
1052                                 &export->exp_nid_hash);
1053
1054         spin_unlock(&export->exp_lock);
1055
1056         /* class_cleanup(), abort_recovery(), and class_fail_export()
1057          * all end up in here, and if any of them race we shouldn't
1058          * call extra class_export_puts(). */
1059         if (already_disconnected)
1060                 RETURN(0);
1061
1062         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1063                export->exp_handle.h_cookie);
1064
1065         class_export_recovery_cleanup(export);
1066         class_unlink_export(export);
1067         class_export_put(export);
1068         RETURN(0);
1069 }
1070
1071 static void class_disconnect_export_list(struct list_head *list,
1072                                          enum obd_option flags)
1073 {
1074         int rc;
1075         struct lustre_handle fake_conn;
1076         struct obd_export *fake_exp, *exp;
1077         ENTRY;
1078
1079         /* It's possible that an export may disconnect itself, but
1080          * nothing else will be added to this list. */
1081         while (!list_empty(list)) {
1082                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1083                 class_export_get(exp);
1084
1085                 spin_lock(&exp->exp_lock);
1086                 exp->exp_flags = flags;
1087                 spin_unlock(&exp->exp_lock);
1088
1089                 if (obd_uuid_equals(&exp->exp_client_uuid,
1090                                     &exp->exp_obd->obd_uuid)) {
1091                         CDEBUG(D_HA,
1092                                "exp %p export uuid == obd uuid, don't discon\n",
1093                                exp);
1094                         /* Need to delete this now so we don't end up pointing
1095                          * to work_list later when this export is cleaned up. */
1096                         list_del_init(&exp->exp_obd_chain);
1097                         class_export_put(exp);
1098                         continue;
1099                 }
1100
1101                 fake_conn.cookie = exp->exp_handle.h_cookie;
1102                 fake_exp = class_conn2export(&fake_conn);
1103                 if (!fake_exp) {
1104                         class_export_put(exp);
1105                         continue;
1106                 }
1107
1108                 spin_lock(&fake_exp->exp_lock);
1109                 fake_exp->exp_flags = flags;
1110                 spin_unlock(&fake_exp->exp_lock);
1111
1112                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1113                        "last request at "CFS_TIME_T"\n",
1114                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1115                        exp, exp->exp_last_request_time);
1116                 rc = obd_disconnect(fake_exp);
1117                 class_export_put(exp);
1118         }
1119         EXIT;
1120 }
1121
1122 void class_disconnect_exports(struct obd_device *obd)
1123 {
1124         struct list_head work_list;
1125         ENTRY;
1126
1127         /* Move all of the exports from obd_exports to a work list, en masse. */
1128         spin_lock(&obd->obd_dev_lock);
1129         list_add(&work_list, &obd->obd_exports);
1130         list_del_init(&obd->obd_exports);
1131         spin_unlock(&obd->obd_dev_lock);
1132
1133         if (!list_empty(&work_list)) {
1134                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1135                        "disconnecting them\n", obd->obd_minor, obd);
1136                 class_disconnect_export_list(&work_list,
1137                                              exp_flags_from_obd(obd));
1138         } else
1139                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1140                        obd->obd_minor, obd);
1141         EXIT;
1142 }
1143 EXPORT_SYMBOL(class_disconnect_exports);
1144
1145 /* Remove exports that have not completed recovery.
1146  */
1147 int class_disconnect_stale_exports(struct obd_device *obd,
1148                                    int (*test_export)(struct obd_export *),
1149                                    enum obd_option flags)
1150 {
1151         struct list_head work_list;
1152         struct list_head *pos, *n;
1153         struct obd_export *exp;
1154         int cnt = 0;
1155         ENTRY;
1156
1157         CFS_INIT_LIST_HEAD(&work_list);
1158         spin_lock(&obd->obd_dev_lock);
1159         list_for_each_safe(pos, n, &obd->obd_exports) {
1160                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1161                 if (test_export(exp))
1162                         continue;
1163
1164                 list_del(&exp->exp_obd_chain);
1165                 list_add(&exp->exp_obd_chain, &work_list);
1166                 /* don't count self-export as client */
1167                 if (obd_uuid_equals(&exp->exp_client_uuid,
1168                                      &exp->exp_obd->obd_uuid))
1169                         continue;
1170
1171                 cnt++;
1172                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1173                        obd->obd_name, exp->exp_client_uuid.uuid,
1174                        exp->exp_connection == NULL ? "<unknown>" :
1175                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1176         }
1177         spin_unlock(&obd->obd_dev_lock);
1178
1179         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1180                obd->obd_name, cnt);
1181         class_disconnect_export_list(&work_list, flags);
1182         RETURN(cnt);
1183 }
1184 EXPORT_SYMBOL(class_disconnect_stale_exports);
1185
1186 void class_fail_export(struct obd_export *exp)
1187 {
1188         int rc, already_failed;
1189
1190         spin_lock(&exp->exp_lock);
1191         already_failed = exp->exp_failed;
1192         exp->exp_failed = 1;
1193         spin_unlock(&exp->exp_lock);
1194
1195         if (already_failed) {
1196                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1197                        exp, exp->exp_client_uuid.uuid);
1198                 return;
1199         }
1200
1201         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1202                exp, exp->exp_client_uuid.uuid);
1203
1204         if (obd_dump_on_timeout)
1205                 libcfs_debug_dumplog();
1206
1207         /* Most callers into obd_disconnect are removing their own reference
1208          * (request, for example) in addition to the one from the hash table.
1209          * We don't have such a reference here, so make one. */
1210         class_export_get(exp);
1211         rc = obd_disconnect(exp);
1212         if (rc)
1213                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1214         else
1215                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1216                        exp, exp->exp_client_uuid.uuid);
1217 }
1218 EXPORT_SYMBOL(class_fail_export);
1219
1220 char *obd_export_nid2str(struct obd_export *exp)
1221 {
1222         if (exp->exp_connection != NULL)
1223                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1224
1225         return "(no nid)";
1226 }
1227 EXPORT_SYMBOL(obd_export_nid2str);
1228
1229 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1230 {
1231         struct obd_export *doomed_exp = NULL;
1232         int exports_evicted = 0;
1233
1234         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1235
1236         do {
1237                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1238                 if (doomed_exp == NULL)
1239                         break;
1240
1241                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1242                          "nid %s found, wanted nid %s, requested nid %s\n",
1243                          obd_export_nid2str(doomed_exp),
1244                          libcfs_nid2str(nid_key), nid);
1245                 LASSERTF(doomed_exp != obd->obd_self_export,
1246                          "self-export is hashed by NID?\n");
1247                 exports_evicted++;
1248                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1249                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1250                        exports_evicted);
1251                 class_fail_export(doomed_exp);
1252                 class_export_put(doomed_exp);
1253         } while (1);
1254
1255         if (!exports_evicted)
1256                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1257                        obd->obd_name, nid);
1258         return exports_evicted;
1259 }
1260 EXPORT_SYMBOL(obd_export_evict_by_nid);
1261
1262 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1263 {
1264         struct obd_export *doomed_exp = NULL;
1265         struct obd_uuid doomed_uuid;
1266         int exports_evicted = 0;
1267
1268         obd_str2uuid(&doomed_uuid, uuid);
1269         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1270                 CERROR("%s: can't evict myself\n", obd->obd_name);
1271                 return exports_evicted;
1272         }
1273
1274         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1275
1276         if (doomed_exp == NULL) {
1277                 CERROR("%s: can't disconnect %s: no exports found\n",
1278                        obd->obd_name, uuid);
1279         } else {
1280                 CWARN("%s: evicting %s at adminstrative request\n",
1281                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1282                 class_fail_export(doomed_exp);
1283                 class_export_put(doomed_exp);
1284                 exports_evicted++;
1285         }
1286
1287         return exports_evicted;
1288 }
1289 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1290
1291 /**
1292  * kill zombie imports and exports
1293  */
1294 void obd_zombie_impexp_cull(void)
1295 {
1296         struct obd_import *import;
1297         struct obd_export *export;
1298         ENTRY;
1299
1300         do {
1301                 spin_lock(&obd_zombie_impexp_lock);
1302
1303                 import = NULL;
1304                 if (!list_empty(&obd_zombie_imports)) {
1305                         import = list_entry(obd_zombie_imports.next,
1306                                             struct obd_import,
1307                                             imp_zombie_chain);
1308                         list_del_init(&import->imp_zombie_chain);
1309                 }
1310
1311                 export = NULL;
1312                 if (!list_empty(&obd_zombie_exports)) {
1313                         export = list_entry(obd_zombie_exports.next,
1314                                             struct obd_export,
1315                                             exp_obd_chain);
1316                         list_del_init(&export->exp_obd_chain);
1317                 }
1318
1319                 spin_unlock(&obd_zombie_impexp_lock);
1320
1321                 if (import != NULL)
1322                         class_import_destroy(import);
1323
1324                 if (export != NULL)
1325                         class_export_destroy(export);
1326
1327         } while (import != NULL || export != NULL);
1328         EXIT;
1329 }
1330
1331 static struct completion        obd_zombie_start;
1332 static struct completion        obd_zombie_stop;
1333 static unsigned long            obd_zombie_flags;
1334 static cfs_waitq_t              obd_zombie_waitq;
1335
1336 enum {
1337         OBD_ZOMBIE_STOP   = 1 << 1
1338 };
1339
1340 /**
1341  * check for work for kill zombie import/export thread.
1342  */
1343 static int obd_zombie_impexp_check(void *arg)
1344 {
1345         int rc;
1346
1347         spin_lock(&obd_zombie_impexp_lock);
1348         rc = list_empty(&obd_zombie_imports) &&
1349              list_empty(&obd_zombie_exports) &&
1350              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1351
1352         spin_unlock(&obd_zombie_impexp_lock);
1353
1354         RETURN(rc);
1355 }
1356
1357 /**
1358  * Add export to the obd_zombe thread and notify it.
1359  */
1360 static void obd_zombie_export_add(struct obd_export *exp) {
1361         spin_lock(&obd_zombie_impexp_lock);
1362         LASSERT(list_empty(&exp->exp_obd_chain));
1363         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1364         spin_unlock(&obd_zombie_impexp_lock);
1365
1366         if (obd_zombie_impexp_notify != NULL)
1367                 obd_zombie_impexp_notify();
1368 }
1369
1370 /**
1371  * Add import to the obd_zombe thread and notify it.
1372  */
1373 static void obd_zombie_import_add(struct obd_import *imp) {
1374         spin_lock(&obd_zombie_impexp_lock);
1375         LASSERT(list_empty(&imp->imp_zombie_chain));
1376         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1377         spin_unlock(&obd_zombie_impexp_lock);
1378
1379         if (obd_zombie_impexp_notify != NULL)
1380                 obd_zombie_impexp_notify();
1381 }
1382
1383 /**
1384  * notify import/export destroy thread about new zombie.
1385  */
1386 static void obd_zombie_impexp_notify(void)
1387 {
1388         cfs_waitq_signal(&obd_zombie_waitq);
1389 }
1390
1391 /**
1392  * check whether obd_zombie is idle
1393  */
1394 static int obd_zombie_is_idle(void)
1395 {
1396         int rc;
1397
1398         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1399         spin_lock(&obd_zombie_impexp_lock);
1400         rc = list_empty(&obd_zombie_imports) &&
1401              list_empty(&obd_zombie_exports);
1402         spin_unlock(&obd_zombie_impexp_lock);
1403         return rc;
1404 }
1405
1406 /**
1407  * wait when obd_zombie import/export queues become empty
1408  */
1409 void obd_zombie_barrier(void)
1410 {
1411         struct l_wait_info lwi = { 0 };
1412         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1413 }
1414 EXPORT_SYMBOL(obd_zombie_barrier);
1415
1416 #ifdef __KERNEL__
1417
1418 /**
1419  * destroy zombie export/import thread.
1420  */
1421 static int obd_zombie_impexp_thread(void *unused)
1422 {
1423         int rc;
1424
1425         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1426                 complete(&obd_zombie_start);
1427                 RETURN(rc);
1428         }
1429
1430         complete(&obd_zombie_start);
1431
1432         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1433                 struct l_wait_info lwi = { 0 };
1434
1435                 l_wait_event(obd_zombie_waitq, 
1436                              !obd_zombie_impexp_check(NULL), &lwi);
1437                 obd_zombie_impexp_cull();
1438
1439                 /* 
1440                  * Notify obd_zombie_barrier callers that queues
1441                  * may be empty.
1442                  */
1443                 cfs_waitq_signal(&obd_zombie_waitq);
1444         }
1445
1446         complete(&obd_zombie_stop);
1447
1448         RETURN(0);
1449 }
1450
1451 #else /* ! KERNEL */
1452
1453 static atomic_t zombie_recur = ATOMIC_INIT(0);
1454 static void *obd_zombie_impexp_work_cb;
1455 static void *obd_zombie_impexp_idle_cb;
1456
1457 int obd_zombie_impexp_kill(void *arg)
1458 {
1459         int rc = 0;
1460
1461         if (atomic_inc_return(&zombie_recur) == 1) {
1462                 obd_zombie_impexp_cull();
1463                 rc = 1;
1464         }
1465         atomic_dec(&zombie_recur);
1466         return rc;
1467 }
1468
1469 #endif
1470
1471 /**
1472  * start destroy zombie import/export thread
1473  */
1474 int obd_zombie_impexp_init(void)
1475 {
1476         int rc;
1477
1478         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1479         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1480         spin_lock_init(&obd_zombie_impexp_lock);
1481         init_completion(&obd_zombie_start);
1482         init_completion(&obd_zombie_stop);
1483         cfs_waitq_init(&obd_zombie_waitq);
1484
1485 #ifdef __KERNEL__
1486         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1487         if (rc < 0)
1488                 RETURN(rc);
1489
1490         wait_for_completion(&obd_zombie_start);
1491 #else
1492
1493         obd_zombie_impexp_work_cb =
1494                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1495                                                  &obd_zombie_impexp_kill, NULL);
1496
1497         obd_zombie_impexp_idle_cb =
1498                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1499                                                  &obd_zombie_impexp_check, NULL);
1500         rc = 0;
1501 #endif
1502         RETURN(rc);
1503 }
1504 /**
1505  * stop destroy zombie import/export thread
1506  */
1507 void obd_zombie_impexp_stop(void)
1508 {
1509         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1510         obd_zombie_impexp_notify();
1511 #ifdef __KERNEL__
1512         wait_for_completion(&obd_zombie_stop);
1513 #else
1514         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1515         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1516 #endif
1517 }