Whamcloud - gitweb
resolve race between obd_disconnect and class_disconnect_exports
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82 EXPORT_SYMBOL(obd_device_alloc);
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef CONFIG_KMOD
120         if (!type) {
121                 const char *modname = name;
122                 if (!request_module(modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 try_module_get(type->typ_dt_ops->o_owner);
135                 spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139
140 void class_put_type(struct obd_type *type)
141 {
142         LASSERT(type);
143         spin_lock(&type->obd_type_lock);
144         type->typ_refcnt--;
145         module_put(type->typ_dt_ops->o_owner);
146         spin_unlock(&type->obd_type_lock);
147 }
148
149 #define CLASS_MAX_NAME 1024
150
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152                         struct lprocfs_vars *vars, const char *name,
153                         struct lu_device_type *ldt)
154 {
155         struct obd_type *type;
156         int rc = 0;
157         ENTRY;
158
159         /* sanity check */
160         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
161
162         if (class_search_type(name)) {
163                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
164                 RETURN(-EEXIST);
165         }
166
167         rc = -ENOMEM;
168         OBD_ALLOC(type, sizeof(*type));
169         if (type == NULL)
170                 RETURN(rc);
171
172         OBD_ALLOC_PTR(type->typ_dt_ops);
173         OBD_ALLOC_PTR(type->typ_md_ops);
174         OBD_ALLOC(type->typ_name, strlen(name) + 1);
175
176         if (type->typ_dt_ops == NULL ||
177             type->typ_md_ops == NULL ||
178             type->typ_name == NULL)
179                 GOTO (failed, rc);
180
181         *(type->typ_dt_ops) = *dt_ops;
182         /* md_ops is optional */
183         if (md_ops)
184                 *(type->typ_md_ops) = *md_ops;
185         strcpy(type->typ_name, name);
186         spin_lock_init(&type->obd_type_lock);
187
188 #ifdef LPROCFS
189         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
190                                               vars, type);
191         if (IS_ERR(type->typ_procroot)) {
192                 rc = PTR_ERR(type->typ_procroot);
193                 type->typ_procroot = NULL;
194                 GOTO (failed, rc);
195         }
196 #endif
197         if (ldt != NULL) {
198                 type->typ_lu = ldt;
199                 rc = lu_device_type_init(ldt);
200                 if (rc != 0)
201                         GOTO (failed, rc);
202         }
203
204         spin_lock(&obd_types_lock);
205         list_add(&type->typ_chain, &obd_types);
206         spin_unlock(&obd_types_lock);
207
208         RETURN (0);
209
210  failed:
211         if (type->typ_name != NULL)
212                 OBD_FREE(type->typ_name, strlen(name) + 1);
213         if (type->typ_md_ops != NULL)
214                 OBD_FREE_PTR(type->typ_md_ops);
215         if (type->typ_dt_ops != NULL)
216                 OBD_FREE_PTR(type->typ_dt_ops);
217         OBD_FREE(type, sizeof(*type));
218         RETURN(rc);
219 }
220
221 int class_unregister_type(const char *name)
222 {
223         struct obd_type *type = class_search_type(name);
224         ENTRY;
225
226         if (!type) {
227                 CERROR("unknown obd type\n");
228                 RETURN(-EINVAL);
229         }
230
231         if (type->typ_refcnt) {
232                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233                 /* This is a bad situation, let's make the best of it */
234                 /* Remove ops, but leave the name for debugging */
235                 OBD_FREE_PTR(type->typ_dt_ops);
236                 OBD_FREE_PTR(type->typ_md_ops);
237                 RETURN(-EBUSY);
238         }
239
240         if (type->typ_procroot) {
241                 lprocfs_remove(&type->typ_procroot);
242         }
243
244         if (type->typ_lu)
245                 lu_device_type_fini(type->typ_lu);
246
247         spin_lock(&obd_types_lock);
248         list_del(&type->typ_chain);
249         spin_unlock(&obd_types_lock);
250         OBD_FREE(type->typ_name, strlen(name) + 1);
251         if (type->typ_dt_ops != NULL)
252                 OBD_FREE_PTR(type->typ_dt_ops);
253         if (type->typ_md_ops != NULL)
254                 OBD_FREE_PTR(type->typ_md_ops);
255         OBD_FREE(type, sizeof(*type));
256         RETURN(0);
257 } /* class_unregister_type */
258
259 /**
260  * Create a new obd device.
261  *
262  * Find an empty slot in ::obd_devs[], create a new obd device in it.
263  *
264  * \param typename [in] obd device type string.
265  * \param name     [in] obd device name.
266  *
267  * \retval NULL if create fails, otherwise return the obd device
268  *         pointer created.
269  */
270 struct obd_device *class_newdev(const char *type_name, const char *name)
271 {
272         struct obd_device *result = NULL;
273         struct obd_device *newdev;
274         struct obd_type *type = NULL;
275         int i;
276         int new_obd_minor = 0;
277
278         if (strlen(name) >= MAX_OBD_NAME) {
279                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280                 RETURN(ERR_PTR(-EINVAL));
281         }
282
283         type = class_get_type(type_name);
284         if (type == NULL){
285                 CERROR("OBD: unknown type: %s\n", type_name);
286                 RETURN(ERR_PTR(-ENODEV));
287         }
288
289         newdev = obd_device_alloc();
290         if (newdev == NULL) {
291                 class_put_type(type);
292                 RETURN(ERR_PTR(-ENOMEM));
293         }
294         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
295
296         spin_lock(&obd_dev_lock);
297         for (i = 0; i < class_devno_max(); i++) {
298                 struct obd_device *obd = class_num2obd(i);
299                 if (obd && obd->obd_name &&
300                     (strcmp(name, obd->obd_name) == 0)) {
301                         CERROR("Device %s already exists, won't add\n", name);
302                         if (result) {
303                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304                                          "%p obd_magic %08x != %08x\n", result,
305                                          result->obd_magic, OBD_DEVICE_MAGIC);
306                                 LASSERTF(result->obd_minor == new_obd_minor,
307                                          "%p obd_minor %d != %d\n", result,
308                                          result->obd_minor, new_obd_minor);
309
310                                 obd_devs[result->obd_minor] = NULL;
311                                 result->obd_name[0]='\0';
312                          }
313                         result = ERR_PTR(-EEXIST);
314                         break;
315                 }
316                 if (!result && !obd) {
317                         result = newdev;
318                         result->obd_minor = i;
319                         new_obd_minor = i;
320                         result->obd_type = type;
321                         strncpy(result->obd_name, name,
322                                 sizeof(result->obd_name) - 1);
323                         obd_devs[i] = result;
324                 }
325         }
326         spin_unlock(&obd_dev_lock);
327
328         if (result == NULL && i >= class_devno_max()) {
329                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
330                        class_devno_max());
331                 result = ERR_PTR(-EOVERFLOW);
332         }
333
334         if (IS_ERR(result)) {
335                 obd_device_free(newdev);
336                 class_put_type(type);
337         } else {
338                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339                        result->obd_name, result);
340         }
341         return result;
342 }
343
344 void class_release_dev(struct obd_device *obd)
345 {
346         struct obd_type *obd_type = obd->obd_type;
347
348         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352         LASSERT(obd_type != NULL);
353
354         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355                obd->obd_name,obd->obd_type->typ_name);
356
357         spin_lock(&obd_dev_lock);
358         obd_devs[obd->obd_minor] = NULL;
359         spin_unlock(&obd_dev_lock);
360         obd_device_free(obd);
361
362         class_put_type(obd_type);
363 }
364
365 int class_name2dev(const char *name)
366 {
367         int i;
368
369         if (!name)
370                 return -1;
371
372         spin_lock(&obd_dev_lock);
373         for (i = 0; i < class_devno_max(); i++) {
374                 struct obd_device *obd = class_num2obd(i);
375                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376                         /* Make sure we finished attaching before we give
377                            out any references */
378                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379                         if (obd->obd_attached) {
380                                 spin_unlock(&obd_dev_lock);
381                                 return i;
382                         }
383                         break;
384                 }
385         }
386         spin_unlock(&obd_dev_lock);
387
388         return -1;
389 }
390
391 struct obd_device *class_name2obd(const char *name)
392 {
393         int dev = class_name2dev(name);
394
395         if (dev < 0 || dev > class_devno_max())
396                 return NULL;
397         return class_num2obd(dev);
398 }
399
400 int class_uuid2dev(struct obd_uuid *uuid)
401 {
402         int i;
403
404         spin_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409                         spin_unlock(&obd_dev_lock);
410                         return i;
411                 }
412         }
413         spin_unlock(&obd_dev_lock);
414
415         return -1;
416 }
417
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
419 {
420         int dev = class_uuid2dev(uuid);
421         if (dev < 0)
422                 return NULL;
423         return class_num2obd(dev);
424 }
425
426 /**
427  * Get obd device from ::obd_devs[]
428  *
429  * \param num [in] array index
430  *
431  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432  *         otherwise return the obd device there.
433  */
434 struct obd_device *class_num2obd(int num)
435 {
436         struct obd_device *obd = NULL;
437
438         if (num < class_devno_max()) {
439                 obd = obd_devs[num];
440                 if (obd == NULL)
441                         return NULL;
442
443                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444                          "%p obd_magic %08x != %08x\n",
445                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446                 LASSERTF(obd->obd_minor == num,
447                          "%p obd_minor %0d != %0d\n",
448                          obd, obd->obd_minor, num);
449         }
450
451         return obd;
452 }
453
454 void class_obd_list(void)
455 {
456         char *status;
457         int i;
458
459         spin_lock(&obd_dev_lock);
460         for (i = 0; i < class_devno_max(); i++) {
461                 struct obd_device *obd = class_num2obd(i);
462                 if (obd == NULL)
463                         continue;
464                 if (obd->obd_stopping)
465                         status = "ST";
466                 else if (obd->obd_set_up)
467                         status = "UP";
468                 else if (obd->obd_attached)
469                         status = "AT";
470                 else
471                         status = "--";
472                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473                          i, status, obd->obd_type->typ_name,
474                          obd->obd_name, obd->obd_uuid.uuid,
475                          atomic_read(&obd->obd_refcount));
476         }
477         spin_unlock(&obd_dev_lock);
478         return;
479 }
480
481 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
482    specified, then only the client with that uuid is returned,
483    otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485                                           const char * typ_name,
486                                           struct obd_uuid *grp_uuid)
487 {
488         int i;
489
490         spin_lock(&obd_dev_lock);
491         for (i = 0; i < class_devno_max(); i++) {
492                 struct obd_device *obd = class_num2obd(i);
493                 if (obd == NULL)
494                         continue;
495                 if ((strncmp(obd->obd_type->typ_name, typ_name,
496                              strlen(typ_name)) == 0)) {
497                         if (obd_uuid_equals(tgt_uuid,
498                                             &obd->u.cli.cl_target_uuid) &&
499                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
500                                                          &obd->obd_uuid) : 1)) {
501                                 spin_unlock(&obd_dev_lock);
502                                 return obd;
503                         }
504                 }
505         }
506         spin_unlock(&obd_dev_lock);
507
508         return NULL;
509 }
510
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512    searching at *next, and if a device is found, the next index to look
513    at is saved in *next. If next is NULL, then the first matching device
514    will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
516 {
517         int i;
518
519         if (next == NULL)
520                 i = 0;
521         else if (*next >= 0 && *next < class_devno_max())
522                 i = *next;
523         else
524                 return NULL;
525
526         spin_lock(&obd_dev_lock);
527         for (; i < class_devno_max(); i++) {
528                 struct obd_device *obd = class_num2obd(i);
529                 if (obd == NULL)
530                         continue;
531                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532                         if (next != NULL)
533                                 *next = i+1;
534                         spin_unlock(&obd_dev_lock);
535                         return obd;
536                 }
537         }
538         spin_unlock(&obd_dev_lock);
539
540         return NULL;
541 }
542
543 /**
544  * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545  * adjust sptlrpc settings accordingly.
546  */
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
548 {
549         struct obd_device  *obd;
550         const char         *type;
551         int                 i, rc = 0, rc2;
552
553         LASSERT(namelen > 0);
554
555         spin_lock(&obd_dev_lock);
556         for (i = 0; i < class_devno_max(); i++) {
557                 obd = class_num2obd(i);
558
559                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
560                         continue;
561
562                 /* only notify mdc, osc, mdt, ost */
563                 type = obd->obd_type->typ_name;
564                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567                     strcmp(type, LUSTRE_OST_NAME) != 0)
568                         continue;
569
570                 if (strncmp(obd->obd_name, fsname, namelen))
571                         continue;
572
573                 class_incref(obd, __FUNCTION__, obd);
574                 spin_unlock(&obd_dev_lock);
575                 rc2 = obd_set_info_async(obd->obd_self_export,
576                                          sizeof(KEY_SPTLRPC_CONF),
577                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
578                 rc = rc ? rc : rc2;
579                 class_decref(obd, __FUNCTION__, obd);
580                 spin_lock(&obd_dev_lock);
581         }
582         spin_unlock(&obd_dev_lock);
583         return rc;
584 }
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
586
587 void obd_cleanup_caches(void)
588 {
589         int rc;
590
591         ENTRY;
592         if (obd_device_cachep) {
593                 rc = cfs_mem_cache_destroy(obd_device_cachep);
594                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595                 obd_device_cachep = NULL;
596         }
597         if (obdo_cachep) {
598                 rc = cfs_mem_cache_destroy(obdo_cachep);
599                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
600                 obdo_cachep = NULL;
601         }
602         if (import_cachep) {
603                 rc = cfs_mem_cache_destroy(import_cachep);
604                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605                 import_cachep = NULL;
606         }
607         if (capa_cachep) {
608                 rc = cfs_mem_cache_destroy(capa_cachep);
609                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
610                 capa_cachep = NULL;
611         }
612         EXIT;
613 }
614
615 int obd_init_caches(void)
616 {
617         ENTRY;
618
619         LASSERT(obd_device_cachep == NULL);
620         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621                                                  sizeof(struct obd_device),
622                                                  0, 0);
623         if (!obd_device_cachep)
624                 GOTO(out, -ENOMEM);
625
626         LASSERT(obdo_cachep == NULL);
627         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
628                                            0, 0);
629         if (!obdo_cachep)
630                 GOTO(out, -ENOMEM);
631
632         LASSERT(import_cachep == NULL);
633         import_cachep = cfs_mem_cache_create("ll_import_cache",
634                                              sizeof(struct obd_import),
635                                              0, 0);
636         if (!import_cachep)
637                 GOTO(out, -ENOMEM);
638
639         LASSERT(capa_cachep == NULL);
640         capa_cachep = cfs_mem_cache_create("capa_cache",
641                                            sizeof(struct obd_capa), 0, 0);
642         if (!capa_cachep)
643                 GOTO(out, -ENOMEM);
644
645         RETURN(0);
646  out:
647         obd_cleanup_caches();
648         RETURN(-ENOMEM);
649
650 }
651
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
654 {
655         struct obd_export *export;
656         ENTRY;
657
658         if (!conn) {
659                 CDEBUG(D_CACHE, "looking for null handle\n");
660                 RETURN(NULL);
661         }
662
663         if (conn->cookie == -1) {  /* this means assign a new connection */
664                 CDEBUG(D_CACHE, "want a new connection\n");
665                 RETURN(NULL);
666         }
667
668         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669         export = class_handle2object(conn->cookie);
670         RETURN(export);
671 }
672
673 struct obd_device *class_exp2obd(struct obd_export *exp)
674 {
675         if (exp)
676                 return exp->exp_obd;
677         return NULL;
678 }
679
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
681 {
682         struct obd_export *export;
683         export = class_conn2export(conn);
684         if (export) {
685                 struct obd_device *obd = export->exp_obd;
686                 class_export_put(export);
687                 return obd;
688         }
689         return NULL;
690 }
691
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
693 {
694         struct obd_device *obd = exp->exp_obd;
695         if (obd == NULL)
696                 return NULL;
697         return obd->u.cli.cl_import;
698 }
699
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
701 {
702         struct obd_device *obd = class_conn2obd(conn);
703         if (obd == NULL)
704                 return NULL;
705         return obd->u.cli.cl_import;
706 }
707
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
710 {
711         struct obd_device *obd = exp->exp_obd;
712         ENTRY;
713
714         LASSERT (atomic_read(&exp->exp_refcount) == 0);
715
716         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717                exp->exp_client_uuid.uuid, obd->obd_name);
718
719         LASSERT(obd != NULL);
720
721         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722         if (exp->exp_connection)
723                 ptlrpc_put_connection_superhack(exp->exp_connection);
724
725         LASSERT(list_empty(&exp->exp_outstanding_replies));
726         LASSERT(list_empty(&exp->exp_uncommitted_replies));
727         LASSERT(list_empty(&exp->exp_req_replay_queue));
728         LASSERT(list_empty(&exp->exp_queued_rpc));
729         obd_destroy_export(exp);
730         class_decref(obd, "export", exp);
731
732         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
733         EXIT;
734 }
735
736 static void export_handle_addref(void *export)
737 {
738         class_export_get(export);
739 }
740
741 struct obd_export *class_export_get(struct obd_export *exp)
742 {
743         atomic_inc(&exp->exp_refcount);
744         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745                atomic_read(&exp->exp_refcount));
746         return exp;
747 }
748 EXPORT_SYMBOL(class_export_get);
749
750 void class_export_put(struct obd_export *exp)
751 {
752         LASSERT(exp != NULL);
753         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
754                atomic_read(&exp->exp_refcount) - 1);
755         LASSERT(atomic_read(&exp->exp_refcount) > 0);
756         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
757
758         if (atomic_dec_and_test(&exp->exp_refcount)) {
759                 CDEBUG(D_IOCTL, "final put %p/%s\n",
760                        exp, exp->exp_client_uuid.uuid);
761                 obd_zombie_export_add(exp);
762         }
763 }
764 EXPORT_SYMBOL(class_export_put);
765
766 /* Creates a new export, adds it to the hash table, and returns a
767  * pointer to it. The refcount is 2: one for the hash reference, and
768  * one for the pointer returned by this function. */
769 struct obd_export *class_new_export(struct obd_device *obd,
770                                     struct obd_uuid *cluuid)
771 {
772         struct obd_export *export;
773         int rc = 0;
774
775         OBD_ALLOC_PTR(export);
776         if (!export)
777                 return ERR_PTR(-ENOMEM);
778
779         export->exp_conn_cnt = 0;
780         export->exp_lock_hash = NULL;
781         atomic_set(&export->exp_refcount, 2);
782         atomic_set(&export->exp_rpc_count, 0);
783         export->exp_obd = obd;
784         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
785         spin_lock_init(&export->exp_uncommitted_replies_lock);
786         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
787         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
788         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
789         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
790         class_handle_hash(&export->exp_handle, export_handle_addref);
791         export->exp_last_request_time = cfs_time_current_sec();
792         spin_lock_init(&export->exp_lock);
793         INIT_HLIST_NODE(&export->exp_uuid_hash);
794         INIT_HLIST_NODE(&export->exp_nid_hash);
795
796         export->exp_sp_peer = LUSTRE_SP_ANY;
797         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
798         export->exp_client_uuid = *cluuid;
799         obd_init_export(export);
800
801         spin_lock(&obd->obd_dev_lock);
802         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
803                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
804                                             &export->exp_uuid_hash);
805                 if (rc != 0) {
806                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
807                                       obd->obd_name, cluuid->uuid, rc);
808                         spin_unlock(&obd->obd_dev_lock);
809                         class_handle_unhash(&export->exp_handle);
810                         OBD_FREE_PTR(export);
811                         return ERR_PTR(-EALREADY);
812                 }
813         }
814
815         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
816         class_incref(obd, "export", export);
817         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
818         list_add_tail(&export->exp_obd_chain_timed,
819                       &export->exp_obd->obd_exports_timed);
820         export->exp_obd->obd_num_exports++;
821         spin_unlock(&obd->obd_dev_lock);
822
823         return export;
824 }
825 EXPORT_SYMBOL(class_new_export);
826
827 void class_unlink_export(struct obd_export *exp)
828 {
829         class_handle_unhash(&exp->exp_handle);
830
831         spin_lock(&exp->exp_obd->obd_dev_lock);
832         /* delete an uuid-export hashitem from hashtables */
833         if (!hlist_unhashed(&exp->exp_uuid_hash))
834                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
835                                 &exp->exp_client_uuid,
836                                 &exp->exp_uuid_hash);
837
838         list_del_init(&exp->exp_obd_chain);
839         list_del_init(&exp->exp_obd_chain_timed);
840         exp->exp_obd->obd_num_exports--;
841         spin_unlock(&exp->exp_obd->obd_dev_lock);
842
843         /* Keep these counter valid always */
844         spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
845         if (exp->exp_delayed)
846                 exp->exp_obd->obd_delayed_clients--;
847         else if (exp->exp_in_recovery)
848                 exp->exp_obd->obd_recoverable_clients--;
849         else if (exp->exp_obd->obd_recovering)
850                 exp->exp_obd->obd_max_recoverable_clients--;
851         spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
852         class_export_put(exp);
853 }
854 EXPORT_SYMBOL(class_unlink_export);
855
856 /* Import management functions */
857 void class_import_destroy(struct obd_import *imp)
858 {
859         ENTRY;
860
861         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
862                 imp->imp_obd->obd_name);
863
864         LASSERT(atomic_read(&imp->imp_refcount) == 0);
865
866         ptlrpc_put_connection_superhack(imp->imp_connection);
867
868         while (!list_empty(&imp->imp_conn_list)) {
869                 struct obd_import_conn *imp_conn;
870
871                 imp_conn = list_entry(imp->imp_conn_list.next,
872                                       struct obd_import_conn, oic_item);
873                 list_del_init(&imp_conn->oic_item);
874                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
875                 OBD_FREE(imp_conn, sizeof(*imp_conn));
876         }
877
878         LASSERT(imp->imp_sec == NULL);
879         class_decref(imp->imp_obd, "import", imp);
880         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
881         EXIT;
882 }
883
884 static void import_handle_addref(void *import)
885 {
886         class_import_get(import);
887 }
888
889 struct obd_import *class_import_get(struct obd_import *import)
890 {
891         LASSERT(atomic_read(&import->imp_refcount) >= 0);
892         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
893         atomic_inc(&import->imp_refcount);
894         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
895                atomic_read(&import->imp_refcount), 
896                import->imp_obd->obd_name);
897         return import;
898 }
899 EXPORT_SYMBOL(class_import_get);
900
901 void class_import_put(struct obd_import *imp)
902 {
903         ENTRY;
904
905         LASSERT(atomic_read(&imp->imp_refcount) > 0);
906         LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
907         LASSERT(list_empty(&imp->imp_zombie_chain));
908
909         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
910                atomic_read(&imp->imp_refcount) - 1, 
911                imp->imp_obd->obd_name);
912
913         if (atomic_dec_and_test(&imp->imp_refcount)) {
914                 CDEBUG(D_INFO, "final put import %p\n", imp);
915                 obd_zombie_import_add(imp);
916         }
917
918         EXIT;
919 }
920 EXPORT_SYMBOL(class_import_put);
921
922 static void init_imp_at(struct imp_at *at) {
923         int i;
924         at_init(&at->iat_net_latency, 0, 0);
925         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
926                 /* max service estimates are tracked on the server side, so
927                    don't use the AT history here, just use the last reported
928                    val. (But keep hist for proc histogram, worst_ever) */
929                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
930                         AT_FLG_NOHIST);
931         }
932 }
933
934 struct obd_import *class_new_import(struct obd_device *obd)
935 {
936         struct obd_import *imp;
937
938         OBD_ALLOC(imp, sizeof(*imp));
939         if (imp == NULL)
940                 return NULL;
941
942         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
943         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
944         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
945         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
946         spin_lock_init(&imp->imp_lock);
947         imp->imp_last_success_conn = 0;
948         imp->imp_state = LUSTRE_IMP_NEW;
949         imp->imp_obd = class_incref(obd, "import", imp);
950         sema_init(&imp->imp_sec_mutex, 1);
951         cfs_waitq_init(&imp->imp_recovery_waitq);
952
953         atomic_set(&imp->imp_refcount, 2);
954         atomic_set(&imp->imp_unregistering, 0);
955         atomic_set(&imp->imp_inflight, 0);
956         atomic_set(&imp->imp_replay_inflight, 0);
957         atomic_set(&imp->imp_inval_count, 0);
958         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
959         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
960         class_handle_hash(&imp->imp_handle, import_handle_addref);
961         init_imp_at(&imp->imp_at);
962
963         /* the default magic is V2, will be used in connect RPC, and
964          * then adjusted according to the flags in request/reply. */
965         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
966
967         return imp;
968 }
969 EXPORT_SYMBOL(class_new_import);
970
971 void class_destroy_import(struct obd_import *import)
972 {
973         LASSERT(import != NULL);
974         LASSERT(import != LP_POISON);
975
976         class_handle_unhash(&import->imp_handle);
977
978         spin_lock(&import->imp_lock);
979         import->imp_generation++;
980         spin_unlock(&import->imp_lock);
981         class_import_put(import);
982 }
983 EXPORT_SYMBOL(class_destroy_import);
984
985 /* A connection defines an export context in which preallocation can
986    be managed. This releases the export pointer reference, and returns
987    the export handle, so the export refcount is 1 when this function
988    returns. */
989 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
990                   struct obd_uuid *cluuid)
991 {
992         struct obd_export *export;
993         LASSERT(conn != NULL);
994         LASSERT(obd != NULL);
995         LASSERT(cluuid != NULL);
996         ENTRY;
997
998         export = class_new_export(obd, cluuid);
999         if (IS_ERR(export))
1000                 RETURN(PTR_ERR(export));
1001
1002         conn->cookie = export->exp_handle.h_cookie;
1003         class_export_put(export);
1004
1005         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1006                cluuid->uuid, conn->cookie);
1007         RETURN(0);
1008 }
1009 EXPORT_SYMBOL(class_connect);
1010
1011 /* if export is involved in recovery then clean up related things */
1012 void class_export_recovery_cleanup(struct obd_export *exp)
1013 {
1014         struct obd_device *obd = exp->exp_obd;
1015
1016         spin_lock_bh(&obd->obd_processing_task_lock);
1017         if (obd->obd_recovering && exp->exp_in_recovery) {
1018                 spin_lock(&exp->exp_lock);
1019                 exp->exp_in_recovery = 0;
1020                 spin_unlock(&exp->exp_lock);
1021                 obd->obd_connected_clients--;
1022                 /* each connected client is counted as recoverable */
1023                 obd->obd_recoverable_clients--;
1024                 if (exp->exp_req_replay_needed) {
1025                         spin_lock(&exp->exp_lock);
1026                         exp->exp_req_replay_needed = 0;
1027                         spin_unlock(&exp->exp_lock);
1028                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
1029                         atomic_dec(&obd->obd_req_replay_clients);
1030                 }
1031                 if (exp->exp_lock_replay_needed) {
1032                         spin_lock(&exp->exp_lock);
1033                         exp->exp_lock_replay_needed = 0;
1034                         spin_unlock(&exp->exp_lock);
1035                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1036                         atomic_dec(&obd->obd_lock_replay_clients);
1037                 }
1038         }
1039         spin_unlock_bh(&obd->obd_processing_task_lock);
1040 }
1041
1042 /* This function removes 1-3 references from the export:
1043  * 1 - for export pointer passed
1044  * and if disconnect really need
1045  * 2 - removing from hash
1046  * 3 - in client_unlink_export
1047  * The export pointer passed to this function can destroyed */
1048 int class_disconnect(struct obd_export *export)
1049 {
1050         int already_disconnected;
1051         ENTRY;
1052
1053         if (export == NULL) {
1054                 fixme();
1055                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1056                 RETURN(-EINVAL);
1057         }
1058
1059         spin_lock(&export->exp_lock);
1060         already_disconnected = export->exp_disconnected;
1061         export->exp_disconnected = 1;
1062         spin_unlock(&export->exp_lock);
1063
1064         /* class_cleanup(), abort_recovery(), and class_fail_export()
1065          * all end up in here, and if any of them race we shouldn't
1066          * call extra class_export_puts(). */
1067         if (already_disconnected)
1068                 GOTO(no_disconn, already_disconnected);
1069
1070         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1071                export->exp_handle.h_cookie);
1072
1073         if (!hlist_unhashed(&export->exp_nid_hash))
1074                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1075                                 &export->exp_connection->c_peer.nid,
1076                                 &export->exp_nid_hash);
1077
1078         class_export_recovery_cleanup(export);
1079         class_unlink_export(export);
1080 no_disconn:
1081         class_export_put(export);
1082         RETURN(0);
1083 }
1084
1085 static void class_disconnect_export_list(struct list_head *list,
1086                                          enum obd_option flags)
1087 {
1088         int rc;
1089         struct obd_export *exp;
1090         ENTRY;
1091
1092         /* It's possible that an export may disconnect itself, but
1093          * nothing else will be added to this list. */
1094         while (!list_empty(list)) {
1095                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1096                 /* need for safe call CDEBUG after obd_disconnect */
1097                 class_export_get(exp);
1098
1099                 spin_lock(&exp->exp_lock);
1100                 exp->exp_flags = flags;
1101                 spin_unlock(&exp->exp_lock);
1102
1103                 if (obd_uuid_equals(&exp->exp_client_uuid,
1104                                     &exp->exp_obd->obd_uuid)) {
1105                         CDEBUG(D_HA,
1106                                "exp %p export uuid == obd uuid, don't discon\n",
1107                                exp);
1108                         /* Need to delete this now so we don't end up pointing
1109                          * to work_list later when this export is cleaned up. */
1110                         list_del_init(&exp->exp_obd_chain);
1111                         class_export_put(exp);
1112                         continue;
1113                 }
1114
1115                 class_export_get(exp);
1116                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1117                        "last request at "CFS_TIME_T"\n",
1118                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1119                        exp, exp->exp_last_request_time);
1120                 /* release one export reference anyway */
1121                 rc = obd_disconnect(exp);
1122
1123                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1124                        obd_export_nid2str(exp), exp, rc);
1125                 class_export_put(exp);
1126         }
1127         EXIT;
1128 }
1129
1130 void class_disconnect_exports(struct obd_device *obd)
1131 {
1132         struct list_head work_list;
1133         ENTRY;
1134
1135         /* Move all of the exports from obd_exports to a work list, en masse. */
1136         CFS_INIT_LIST_HEAD(&work_list);
1137         spin_lock(&obd->obd_dev_lock);
1138         list_splice_init(&obd->obd_exports, &work_list);
1139         list_splice_init(&obd->obd_delayed_exports, &work_list);
1140         spin_unlock(&obd->obd_dev_lock);
1141
1142         if (!list_empty(&work_list)) {
1143                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1144                        "disconnecting them\n", obd->obd_minor, obd);
1145                 class_disconnect_export_list(&work_list,
1146                                              exp_flags_from_obd(obd));
1147         } else
1148                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1149                        obd->obd_minor, obd);
1150         EXIT;
1151 }
1152 EXPORT_SYMBOL(class_disconnect_exports);
1153
1154 /* Remove exports that have not completed recovery.
1155  */
1156 void class_disconnect_stale_exports(struct obd_device *obd,
1157                                     int (*test_export)(struct obd_export *),
1158                                     enum obd_option flags)
1159 {
1160         struct list_head work_list;
1161         struct list_head *pos, *n;
1162         struct obd_export *exp;
1163         ENTRY;
1164
1165         CFS_INIT_LIST_HEAD(&work_list);
1166         spin_lock(&obd->obd_dev_lock);
1167         obd->obd_stale_clients = 0;
1168         list_for_each_safe(pos, n, &obd->obd_exports) {
1169                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1170                 if (test_export(exp))
1171                         continue;
1172
1173                 list_move(&exp->exp_obd_chain, &work_list);
1174                 /* don't count self-export as client */
1175                 if (obd_uuid_equals(&exp->exp_client_uuid,
1176                                      &exp->exp_obd->obd_uuid))
1177                         continue;
1178
1179                 obd->obd_stale_clients++;
1180                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1181                        obd->obd_name, exp->exp_client_uuid.uuid,
1182                        exp->exp_connection == NULL ? "<unknown>" :
1183                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1184         }
1185         spin_unlock(&obd->obd_dev_lock);
1186
1187         CDEBUG(D_HA, "%s: disconnecting %d stale clients\n", obd->obd_name,
1188                obd->obd_stale_clients);
1189
1190         class_disconnect_export_list(&work_list, flags);
1191         EXIT;
1192 }
1193 EXPORT_SYMBOL(class_disconnect_stale_exports);
1194
1195 void class_fail_export(struct obd_export *exp)
1196 {
1197         int rc, already_failed;
1198
1199         spin_lock(&exp->exp_lock);
1200         already_failed = exp->exp_failed;
1201         exp->exp_failed = 1;
1202         spin_unlock(&exp->exp_lock);
1203
1204         if (already_failed) {
1205                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1206                        exp, exp->exp_client_uuid.uuid);
1207                 return;
1208         }
1209
1210         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1211                exp, exp->exp_client_uuid.uuid);
1212
1213         if (obd_dump_on_timeout)
1214                 libcfs_debug_dumplog();
1215
1216         /* Most callers into obd_disconnect are removing their own reference
1217          * (request, for example) in addition to the one from the hash table.
1218          * We don't have such a reference here, so make one. */
1219         class_export_get(exp);
1220         rc = obd_disconnect(exp);
1221         if (rc)
1222                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1223         else
1224                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1225                        exp, exp->exp_client_uuid.uuid);
1226 }
1227 EXPORT_SYMBOL(class_fail_export);
1228
1229 char *obd_export_nid2str(struct obd_export *exp)
1230 {
1231         if (exp->exp_connection != NULL)
1232                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1233
1234         return "(no nid)";
1235 }
1236 EXPORT_SYMBOL(obd_export_nid2str);
1237
1238 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1239 {
1240         struct obd_export *doomed_exp = NULL;
1241         int exports_evicted = 0;
1242
1243         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1244
1245         do {
1246                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1247                 if (doomed_exp == NULL)
1248                         break;
1249
1250                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1251                          "nid %s found, wanted nid %s, requested nid %s\n",
1252                          obd_export_nid2str(doomed_exp),
1253                          libcfs_nid2str(nid_key), nid);
1254                 LASSERTF(doomed_exp != obd->obd_self_export,
1255                          "self-export is hashed by NID?\n");
1256                 exports_evicted++;
1257                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1258                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1259                        exports_evicted);
1260                 class_fail_export(doomed_exp);
1261                 class_export_put(doomed_exp);
1262         } while (1);
1263
1264         if (!exports_evicted)
1265                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1266                        obd->obd_name, nid);
1267         return exports_evicted;
1268 }
1269 EXPORT_SYMBOL(obd_export_evict_by_nid);
1270
1271 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1272 {
1273         struct obd_export *doomed_exp = NULL;
1274         struct obd_uuid doomed_uuid;
1275         int exports_evicted = 0;
1276
1277         obd_str2uuid(&doomed_uuid, uuid);
1278         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1279                 CERROR("%s: can't evict myself\n", obd->obd_name);
1280                 return exports_evicted;
1281         }
1282
1283         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1284
1285         if (doomed_exp == NULL) {
1286                 CERROR("%s: can't disconnect %s: no exports found\n",
1287                        obd->obd_name, uuid);
1288         } else {
1289                 CWARN("%s: evicting %s at adminstrative request\n",
1290                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1291                 class_fail_export(doomed_exp);
1292                 class_export_put(doomed_exp);
1293                 exports_evicted++;
1294         }
1295
1296         return exports_evicted;
1297 }
1298 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1299
1300 /**
1301  * kill zombie imports and exports
1302  */
1303 void obd_zombie_impexp_cull(void)
1304 {
1305         struct obd_import *import;
1306         struct obd_export *export;
1307         ENTRY;
1308
1309         do {
1310                 spin_lock(&obd_zombie_impexp_lock);
1311
1312                 import = NULL;
1313                 if (!list_empty(&obd_zombie_imports)) {
1314                         import = list_entry(obd_zombie_imports.next,
1315                                             struct obd_import,
1316                                             imp_zombie_chain);
1317                         list_del_init(&import->imp_zombie_chain);
1318                 }
1319
1320                 export = NULL;
1321                 if (!list_empty(&obd_zombie_exports)) {
1322                         export = list_entry(obd_zombie_exports.next,
1323                                             struct obd_export,
1324                                             exp_obd_chain);
1325                         list_del_init(&export->exp_obd_chain);
1326                 }
1327
1328                 spin_unlock(&obd_zombie_impexp_lock);
1329
1330                 if (import != NULL)
1331                         class_import_destroy(import);
1332
1333                 if (export != NULL)
1334                         class_export_destroy(export);
1335
1336         } while (import != NULL || export != NULL);
1337         EXIT;
1338 }
1339
1340 static struct completion        obd_zombie_start;
1341 static struct completion        obd_zombie_stop;
1342 static unsigned long            obd_zombie_flags;
1343 static cfs_waitq_t              obd_zombie_waitq;
1344
1345 enum {
1346         OBD_ZOMBIE_STOP   = 1 << 1
1347 };
1348
1349 /**
1350  * check for work for kill zombie import/export thread.
1351  */
1352 static int obd_zombie_impexp_check(void *arg)
1353 {
1354         int rc;
1355
1356         spin_lock(&obd_zombie_impexp_lock);
1357         rc = list_empty(&obd_zombie_imports) &&
1358              list_empty(&obd_zombie_exports) &&
1359              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1360
1361         spin_unlock(&obd_zombie_impexp_lock);
1362
1363         RETURN(rc);
1364 }
1365
1366 /**
1367  * Add export to the obd_zombe thread and notify it.
1368  */
1369 static void obd_zombie_export_add(struct obd_export *exp) {
1370         spin_lock(&obd_zombie_impexp_lock);
1371         LASSERT(list_empty(&exp->exp_obd_chain));
1372         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1373         spin_unlock(&obd_zombie_impexp_lock);
1374
1375         if (obd_zombie_impexp_notify != NULL)
1376                 obd_zombie_impexp_notify();
1377 }
1378
1379 /**
1380  * Add import to the obd_zombe thread and notify it.
1381  */
1382 static void obd_zombie_import_add(struct obd_import *imp) {
1383         LASSERT(imp->imp_sec == NULL);
1384         spin_lock(&obd_zombie_impexp_lock);
1385         LASSERT(list_empty(&imp->imp_zombie_chain));
1386         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1387         spin_unlock(&obd_zombie_impexp_lock);
1388
1389         if (obd_zombie_impexp_notify != NULL)
1390                 obd_zombie_impexp_notify();
1391 }
1392
1393 /**
1394  * notify import/export destroy thread about new zombie.
1395  */
1396 static void obd_zombie_impexp_notify(void)
1397 {
1398         cfs_waitq_signal(&obd_zombie_waitq);
1399 }
1400
1401 /**
1402  * check whether obd_zombie is idle
1403  */
1404 static int obd_zombie_is_idle(void)
1405 {
1406         int rc;
1407
1408         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1409         spin_lock(&obd_zombie_impexp_lock);
1410         rc = list_empty(&obd_zombie_imports) &&
1411              list_empty(&obd_zombie_exports);
1412         spin_unlock(&obd_zombie_impexp_lock);
1413         return rc;
1414 }
1415
1416 /**
1417  * wait when obd_zombie import/export queues become empty
1418  */
1419 void obd_zombie_barrier(void)
1420 {
1421         struct l_wait_info lwi = { 0 };
1422         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1423 }
1424 EXPORT_SYMBOL(obd_zombie_barrier);
1425
1426 #ifdef __KERNEL__
1427
1428 /**
1429  * destroy zombie export/import thread.
1430  */
1431 static int obd_zombie_impexp_thread(void *unused)
1432 {
1433         int rc;
1434
1435         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1436                 complete(&obd_zombie_start);
1437                 RETURN(rc);
1438         }
1439
1440         complete(&obd_zombie_start);
1441
1442         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1443                 struct l_wait_info lwi = { 0 };
1444
1445                 l_wait_event(obd_zombie_waitq, 
1446                              !obd_zombie_impexp_check(NULL), &lwi);
1447                 obd_zombie_impexp_cull();
1448
1449                 /* 
1450                  * Notify obd_zombie_barrier callers that queues
1451                  * may be empty.
1452                  */
1453                 cfs_waitq_signal(&obd_zombie_waitq);
1454         }
1455
1456         complete(&obd_zombie_stop);
1457
1458         RETURN(0);
1459 }
1460
1461 #else /* ! KERNEL */
1462
1463 static atomic_t zombie_recur = ATOMIC_INIT(0);
1464 static void *obd_zombie_impexp_work_cb;
1465 static void *obd_zombie_impexp_idle_cb;
1466
1467 int obd_zombie_impexp_kill(void *arg)
1468 {
1469         int rc = 0;
1470
1471         if (atomic_inc_return(&zombie_recur) == 1) {
1472                 obd_zombie_impexp_cull();
1473                 rc = 1;
1474         }
1475         atomic_dec(&zombie_recur);
1476         return rc;
1477 }
1478
1479 #endif
1480
1481 /**
1482  * start destroy zombie import/export thread
1483  */
1484 int obd_zombie_impexp_init(void)
1485 {
1486         int rc;
1487
1488         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1489         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1490         spin_lock_init(&obd_zombie_impexp_lock);
1491         init_completion(&obd_zombie_start);
1492         init_completion(&obd_zombie_stop);
1493         cfs_waitq_init(&obd_zombie_waitq);
1494
1495 #ifdef __KERNEL__
1496         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1497         if (rc < 0)
1498                 RETURN(rc);
1499
1500         wait_for_completion(&obd_zombie_start);
1501 #else
1502
1503         obd_zombie_impexp_work_cb =
1504                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1505                                                  &obd_zombie_impexp_kill, NULL);
1506
1507         obd_zombie_impexp_idle_cb =
1508                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1509                                                  &obd_zombie_impexp_check, NULL);
1510         rc = 0;
1511 #endif
1512         RETURN(rc);
1513 }
1514 /**
1515  * stop destroy zombie import/export thread
1516  */
1517 void obd_zombie_impexp_stop(void)
1518 {
1519         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1520         obd_zombie_impexp_notify();
1521 #ifdef __KERNEL__
1522         wait_for_completion(&obd_zombie_stop);
1523 #else
1524         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1525         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1526 #endif
1527 }