Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  *
24  * These are the only exported functions, they provide some generic
25  * infrastructure for managing object devices
26  */
27
28 #define DEBUG_SUBSYSTEM S_CLASS
29 #ifndef __KERNEL__
30 #include <liblustre.h>
31 #endif
32 #include <obd_ost.h>
33 #include <obd_class.h>
34 #include <lprocfs_status.h>
35 #include <class_hash.h>
36
37 extern struct list_head obd_types;
38 spinlock_t obd_types_lock;
39
40 cfs_mem_cache_t *obd_device_cachep;
41 cfs_mem_cache_t *obdo_cachep;
42 EXPORT_SYMBOL(obdo_cachep);
43 cfs_mem_cache_t *import_cachep;
44
45 struct list_head  obd_zombie_imports;
46 struct list_head  obd_zombie_exports;
47 spinlock_t        obd_zombie_impexp_lock;
48 static void obd_zombie_impexp_notify(void);
49
50 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
51
52 /*
53  * support functions: we could use inter-module communication, but this
54  * is more portable to other OS's
55  */
56 static struct obd_device *obd_device_alloc(void)
57 {
58         struct obd_device *obd;
59
60         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
61         if (obd != NULL) {
62                 obd->obd_magic = OBD_DEVICE_MAGIC;
63         }
64         return obd;
65 }
66 EXPORT_SYMBOL(obd_device_alloc);
67
68 static void obd_device_free(struct obd_device *obd)
69 {
70         LASSERT(obd != NULL);
71         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n", 
72                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
73         if (obd->obd_namespace != NULL) {
74                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n", 
75                        obd, obd->obd_namespace, obd->obd_force);
76                 LBUG();
77         }
78         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
79 }
80 EXPORT_SYMBOL(obd_device_free);
81
82 struct obd_type *class_search_type(const char *name)
83 {
84         struct list_head *tmp;
85         struct obd_type *type;
86
87         spin_lock(&obd_types_lock);
88         list_for_each(tmp, &obd_types) {
89                 type = list_entry(tmp, struct obd_type, typ_chain);
90                 if (strcmp(type->typ_name, name) == 0) {
91                         spin_unlock(&obd_types_lock);
92                         return type;
93                 }
94         }
95         spin_unlock(&obd_types_lock);
96         return NULL;
97 }
98
99 struct obd_type *class_get_type(const char *name)
100 {
101         struct obd_type *type = class_search_type(name);
102
103 #ifdef CONFIG_KMOD
104         if (!type) {
105                 const char *modname = name;
106                 if (!request_module(modname)) {
107                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
108                         type = class_search_type(name);
109                 } else {
110                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
111                                            modname);
112                 }
113         }
114 #endif
115         if (type) {
116                 spin_lock(&type->obd_type_lock);
117                 type->typ_refcnt++;
118                 try_module_get(type->typ_dt_ops->o_owner);
119                 spin_unlock(&type->obd_type_lock);
120         }
121         return type;
122 }
123
124 void class_put_type(struct obd_type *type)
125 {
126         LASSERT(type);
127         spin_lock(&type->obd_type_lock);
128         type->typ_refcnt--;
129         module_put(type->typ_dt_ops->o_owner);
130         spin_unlock(&type->obd_type_lock);
131 }
132
133 #define CLASS_MAX_NAME 1024
134
135 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops, 
136                         struct lprocfs_vars *vars, const char *name, 
137                         struct lu_device_type *ldt)
138 {
139         struct obd_type *type;
140         int rc = 0;
141         ENTRY;
142
143         /* sanity check */
144         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
145
146         if (class_search_type(name)) {
147                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
148                 RETURN(-EEXIST);
149         }
150
151         rc = -ENOMEM;
152         OBD_ALLOC(type, sizeof(*type));
153         if (type == NULL)
154                 RETURN(rc);
155
156         OBD_ALLOC_PTR(type->typ_dt_ops);
157         OBD_ALLOC_PTR(type->typ_md_ops);
158         OBD_ALLOC(type->typ_name, strlen(name) + 1);
159         
160         if (type->typ_dt_ops == NULL || 
161             type->typ_md_ops == NULL || 
162             type->typ_name == NULL)
163                 GOTO (failed, rc);
164
165         *(type->typ_dt_ops) = *dt_ops;
166         /* md_ops is optional */
167         if (md_ops)
168                 *(type->typ_md_ops) = *md_ops;
169         strcpy(type->typ_name, name);
170         spin_lock_init(&type->obd_type_lock);
171
172 #ifdef LPROCFS
173         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
174                                               vars, type);
175         if (IS_ERR(type->typ_procroot)) {
176                 rc = PTR_ERR(type->typ_procroot);
177                 type->typ_procroot = NULL;
178                 GOTO (failed, rc);
179         }
180 #endif
181         if (ldt != NULL) {
182                 type->typ_lu = ldt;
183                 rc = ldt->ldt_ops->ldto_init(ldt);
184                 if (rc != 0)
185                         GOTO (failed, rc);
186         }
187
188         spin_lock(&obd_types_lock);
189         list_add(&type->typ_chain, &obd_types);
190         spin_unlock(&obd_types_lock);
191
192         RETURN (0);
193
194  failed:
195         if (type->typ_name != NULL)
196                 OBD_FREE(type->typ_name, strlen(name) + 1);
197         if (type->typ_md_ops != NULL)
198                 OBD_FREE_PTR(type->typ_md_ops);
199         if (type->typ_dt_ops != NULL)
200                 OBD_FREE_PTR(type->typ_dt_ops);
201         OBD_FREE(type, sizeof(*type));
202         RETURN(rc);
203 }
204
205 int class_unregister_type(const char *name)
206 {
207         struct obd_type *type = class_search_type(name);
208         ENTRY;
209
210         if (!type) {
211                 CERROR("unknown obd type\n");
212                 RETURN(-EINVAL);
213         }
214
215         if (type->typ_refcnt) {
216                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
217                 /* This is a bad situation, let's make the best of it */
218                 /* Remove ops, but leave the name for debugging */
219                 OBD_FREE_PTR(type->typ_dt_ops);
220                 OBD_FREE_PTR(type->typ_md_ops);
221                 RETURN(-EBUSY);
222         }
223
224         if (type->typ_procroot) {
225                 lprocfs_remove(&type->typ_procroot);
226         }
227
228         if (type->typ_lu)
229                 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
230
231         spin_lock(&obd_types_lock);
232         list_del(&type->typ_chain);
233         spin_unlock(&obd_types_lock);
234         OBD_FREE(type->typ_name, strlen(name) + 1);
235         if (type->typ_dt_ops != NULL)
236                 OBD_FREE_PTR(type->typ_dt_ops);
237         if (type->typ_md_ops != NULL)
238                 OBD_FREE_PTR(type->typ_md_ops);
239         OBD_FREE(type, sizeof(*type));
240         RETURN(0);
241 } /* class_unregister_type */
242
243 struct obd_device *class_newdev(const char *type_name, const char *name)
244 {
245         struct obd_device *result = NULL;
246         struct obd_device *newdev;
247         struct obd_type *type = NULL;
248         int i;
249         int new_obd_minor = 0;
250
251         if (strlen(name) > MAX_OBD_NAME) {
252                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
253                 RETURN(ERR_PTR(-EINVAL));
254         }
255
256         type = class_get_type(type_name); 
257         if (type == NULL){
258                 CERROR("OBD: unknown type: %s\n", type_name);
259                 RETURN(ERR_PTR(-ENODEV));
260         }
261
262         newdev = obd_device_alloc();
263         if (newdev == NULL) { 
264                 class_put_type(type);
265                 RETURN(ERR_PTR(-ENOMEM));
266         }
267         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
268
269         spin_lock(&obd_dev_lock);
270         for (i = 0; i < class_devno_max(); i++) {
271                 struct obd_device *obd = class_num2obd(i);
272                 if (obd && obd->obd_name &&
273                     (strcmp(name, obd->obd_name) == 0)) {
274                         CERROR("Device %s already exists, won't add\n", name);
275                         if (result) {
276                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
277                                          "%p obd_magic %08x != %08x\n", result,
278                                          result->obd_magic, OBD_DEVICE_MAGIC);
279                                 LASSERTF(result->obd_minor == new_obd_minor,
280                                          "%p obd_minor %d != %d\n", result,
281                                          result->obd_minor, new_obd_minor);
282
283                                 obd_devs[result->obd_minor] = NULL;
284                                 result->obd_name[0]='\0';
285                          }
286                         result = ERR_PTR(-EEXIST);
287                         break;
288                 }
289                 if (!result && !obd) {
290                         result = newdev;
291                         result->obd_minor = i;
292                         new_obd_minor = i;
293                         result->obd_type = type;
294                         memcpy(result->obd_name, name, strlen(name));
295                         obd_devs[i] = result;
296                 }
297         }
298         spin_unlock(&obd_dev_lock);
299         
300         if (result == NULL && i >= class_devno_max()) {
301                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
302                        class_devno_max());
303                 result = ERR_PTR(-EOVERFLOW);
304         }
305         
306         if (IS_ERR(result)) {
307                 obd_device_free(newdev);
308                 class_put_type(type);
309         } else {
310                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
311                        result->obd_name, result);
312         }
313         return result;
314 }
315
316 void class_release_dev(struct obd_device *obd)
317 {
318         struct obd_type *obd_type = obd->obd_type;
319
320         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
321                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
322         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
323                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
324         LASSERT(obd_type != NULL);
325
326         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
327                obd->obd_name,obd->obd_type->typ_name);
328
329         spin_lock(&obd_dev_lock);
330         obd_devs[obd->obd_minor] = NULL;
331         spin_unlock(&obd_dev_lock);
332         obd_device_free(obd);
333
334         class_put_type(obd_type);
335 }
336
337 int class_name2dev(const char *name)
338 {
339         int i;
340
341         if (!name)
342                 return -1;
343
344         spin_lock(&obd_dev_lock);
345         for (i = 0; i < class_devno_max(); i++) {
346                 struct obd_device *obd = class_num2obd(i);
347                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
348                         /* Make sure we finished attaching before we give
349                            out any references */
350                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
351                         if (obd->obd_attached) {
352                                 spin_unlock(&obd_dev_lock);
353                                 return i;
354                         }
355                         break;
356                 }
357         }
358         spin_unlock(&obd_dev_lock);
359
360         return -1;
361 }
362
363 struct obd_device *class_name2obd(const char *name)
364 {
365         int dev = class_name2dev(name);
366
367         if (dev < 0 || dev > class_devno_max())
368                 return NULL;
369         return class_num2obd(dev);
370 }
371
372 int class_uuid2dev(struct obd_uuid *uuid)
373 {
374         int i;
375
376         spin_lock(&obd_dev_lock);
377         for (i = 0; i < class_devno_max(); i++) {
378                 struct obd_device *obd = class_num2obd(i);
379                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
380                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
381                         spin_unlock(&obd_dev_lock);
382                         return i;
383                 }
384         }
385         spin_unlock(&obd_dev_lock);
386
387         return -1;
388 }
389
390 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
391 {
392         int dev = class_uuid2dev(uuid);
393         if (dev < 0)
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 struct obd_device *class_num2obd(int num)
399 {
400         struct obd_device *obd = NULL;
401
402         if (num < class_devno_max()) {
403                 obd = obd_devs[num];
404                 if (obd == NULL) {
405                         return NULL;
406                 }
407
408                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
409                          "%p obd_magic %08x != %08x\n",
410                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
411                 LASSERTF(obd->obd_minor == num,
412                          "%p obd_minor %0d != %0d\n",
413                          obd, obd->obd_minor, num);
414         }
415
416         return obd;
417 }
418
419 void class_obd_list(void)
420 {
421         char *status;
422         int i;
423
424         spin_lock(&obd_dev_lock);
425         for (i = 0; i < class_devno_max(); i++) {
426                 struct obd_device *obd = class_num2obd(i);
427                 if (obd == NULL)
428                         continue;
429                 if (obd->obd_stopping)
430                         status = "ST";
431                 else if (obd->obd_set_up)
432                         status = "UP";
433                 else if (obd->obd_attached)
434                         status = "AT";
435                 else
436                         status = "--";
437                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
438                          i, status, obd->obd_type->typ_name,
439                          obd->obd_name, obd->obd_uuid.uuid,
440                          atomic_read(&obd->obd_refcount));
441         }
442         spin_unlock(&obd_dev_lock);
443         return;
444 }
445
446 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
447    specified, then only the client with that uuid is returned,
448    otherwise any client connected to the tgt is returned. */
449 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
450                                           const char * typ_name,
451                                           struct obd_uuid *grp_uuid)
452 {
453         int i;
454
455         spin_lock(&obd_dev_lock);
456         for (i = 0; i < class_devno_max(); i++) {
457                 struct obd_device *obd = class_num2obd(i);
458                 if (obd == NULL)
459                         continue;
460                 if ((strncmp(obd->obd_type->typ_name, typ_name,
461                              strlen(typ_name)) == 0)) {
462                         if (obd_uuid_equals(tgt_uuid,
463                                             &obd->u.cli.cl_target_uuid) &&
464                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
465                                                          &obd->obd_uuid) : 1)) {
466                                 spin_unlock(&obd_dev_lock);
467                                 return obd;
468                         }
469                 }
470         }
471         spin_unlock(&obd_dev_lock);
472
473         return NULL;
474 }
475
476 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
477                                             struct obd_uuid *grp_uuid)
478 {
479         struct obd_device *obd;
480
481         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
482         if (!obd)
483                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
484                                             grp_uuid);
485         return obd;
486 }
487
488 /* Iterate the obd_device list looking devices have grp_uuid. Start
489    searching at *next, and if a device is found, the next index to look
490    at is saved in *next. If next is NULL, then the first matching device
491    will always be returned. */
492 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
493 {
494         int i;
495
496         if (next == NULL)
497                 i = 0;
498         else if (*next >= 0 && *next < class_devno_max())
499                 i = *next;
500         else
501                 return NULL;
502
503         spin_lock(&obd_dev_lock);
504         for (; i < class_devno_max(); i++) {
505                 struct obd_device *obd = class_num2obd(i);
506                 if (obd == NULL)
507                         continue;
508                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
509                         if (next != NULL)
510                                 *next = i+1;
511                         spin_unlock(&obd_dev_lock);
512                         return obd;
513                 }
514         }
515         spin_unlock(&obd_dev_lock);
516
517         return NULL;
518 }
519
520
521 void obd_cleanup_caches(void)
522 {
523         int rc;
524
525         ENTRY;
526         if (obd_device_cachep) {
527                 rc = cfs_mem_cache_destroy(obd_device_cachep);
528                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
529                 obd_device_cachep = NULL;
530         }
531         if (obdo_cachep) {
532                 rc = cfs_mem_cache_destroy(obdo_cachep);
533                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
534                 obdo_cachep = NULL;
535         }
536         if (import_cachep) {
537                 rc = cfs_mem_cache_destroy(import_cachep);
538                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
539                 import_cachep = NULL;
540         }
541         if (capa_cachep) {
542                 rc = cfs_mem_cache_destroy(capa_cachep);
543                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
544                 capa_cachep = NULL;
545         }
546         EXIT;
547 }
548
549 int obd_init_caches(void)
550 {
551         ENTRY;
552
553         LASSERT(obd_device_cachep == NULL);
554         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
555                                                  sizeof(struct obd_device), 
556                                                  0, 0);
557         if (!obd_device_cachep)
558                 GOTO(out, -ENOMEM);
559
560         LASSERT(obdo_cachep == NULL);
561         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
562                                            0, 0);
563         if (!obdo_cachep)
564                 GOTO(out, -ENOMEM);
565
566         LASSERT(import_cachep == NULL);
567         import_cachep = cfs_mem_cache_create("ll_import_cache",
568                                              sizeof(struct obd_import),
569                                              0, 0);
570         if (!import_cachep)
571                 GOTO(out, -ENOMEM);
572
573         LASSERT(capa_cachep == NULL);
574         capa_cachep = cfs_mem_cache_create("capa_cache",
575                                            sizeof(struct obd_capa), 0, 0);
576         if (!capa_cachep)
577                 GOTO(out, -ENOMEM);
578
579         RETURN(0);
580  out:
581         obd_cleanup_caches();
582         RETURN(-ENOMEM);
583
584 }
585
586 /* map connection to client */
587 struct obd_export *class_conn2export(struct lustre_handle *conn)
588 {
589         struct obd_export *export;
590         ENTRY;
591
592         if (!conn) {
593                 CDEBUG(D_CACHE, "looking for null handle\n");
594                 RETURN(NULL);
595         }
596
597         if (conn->cookie == -1) {  /* this means assign a new connection */
598                 CDEBUG(D_CACHE, "want a new connection\n");
599                 RETURN(NULL);
600         }
601
602         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
603         export = class_handle2object(conn->cookie);
604         RETURN(export);
605 }
606
607 struct obd_device *class_exp2obd(struct obd_export *exp)
608 {
609         if (exp)
610                 return exp->exp_obd;
611         return NULL;
612 }
613
614 struct obd_device *class_conn2obd(struct lustre_handle *conn)
615 {
616         struct obd_export *export;
617         export = class_conn2export(conn);
618         if (export) {
619                 struct obd_device *obd = export->exp_obd;
620                 class_export_put(export);
621                 return obd;
622         }
623         return NULL;
624 }
625
626 struct obd_import *class_exp2cliimp(struct obd_export *exp)
627 {
628         struct obd_device *obd = exp->exp_obd;
629         if (obd == NULL)
630                 return NULL;
631         return obd->u.cli.cl_import;
632 }
633
634 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
635 {
636         struct obd_device *obd = class_conn2obd(conn);
637         if (obd == NULL)
638                 return NULL;
639         return obd->u.cli.cl_import;
640 }
641
642 /* Export management functions */
643 static void export_handle_addref(void *export)
644 {
645         class_export_get(export);
646 }
647
648 void __class_export_put(struct obd_export *exp)
649 {
650         if (atomic_dec_and_test(&exp->exp_refcount)) {
651                 LASSERT (list_empty(&exp->exp_obd_chain));
652
653                 CDEBUG(D_IOCTL, "final put %p/%s\n",
654                        exp, exp->exp_client_uuid.uuid);
655         
656                 spin_lock(&obd_zombie_impexp_lock);
657                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
658                 spin_unlock(&obd_zombie_impexp_lock);
659
660                 if (obd_zombie_impexp_notify != NULL)
661                         obd_zombie_impexp_notify();
662         }
663 }
664 EXPORT_SYMBOL(__class_export_put);
665
666 void class_export_destroy(struct obd_export *exp)
667 {
668         struct obd_device *obd = exp->exp_obd;
669         ENTRY;
670
671         LASSERT (atomic_read(&exp->exp_refcount) == 0);
672
673         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
674                exp->exp_client_uuid.uuid, obd->obd_name);
675
676         LASSERT(obd != NULL);
677
678         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
679         if (exp->exp_connection)
680                 ptlrpc_put_connection_superhack(exp->exp_connection);
681
682         LASSERT(list_empty(&exp->exp_outstanding_replies));
683         LASSERT(list_empty(&exp->exp_req_replay_queue));
684         obd_destroy_export(exp);
685  
686         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
687         class_decref(obd);
688         EXIT;
689 }
690
691 /* Creates a new export, adds it to the hash table, and returns a
692  * pointer to it. The refcount is 2: one for the hash reference, and
693  * one for the pointer returned by this function. */
694 struct obd_export *class_new_export(struct obd_device *obd,
695                                     struct obd_uuid *cluuid)
696 {
697         struct obd_export *export;
698         int rc = 0;
699
700         OBD_ALLOC_PTR(export);
701         if (!export)
702                 return ERR_PTR(-ENOMEM);
703
704         export->exp_conn_cnt = 0;
705         atomic_set(&export->exp_refcount, 2);
706         atomic_set(&export->exp_rpc_count, 0);
707         export->exp_obd = obd;
708         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
709         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
710         /* XXX this should be in LDLM init */
711         CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
712         spin_lock_init(&export->exp_ldlm_data.led_lock);
713
714         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
715         class_handle_hash(&export->exp_handle, export_handle_addref);
716         export->exp_last_request_time = cfs_time_current_sec();
717         spin_lock_init(&export->exp_lock);
718         INIT_HLIST_NODE(&export->exp_uuid_hash);
719         INIT_HLIST_NODE(&export->exp_nid_hash);
720
721         export->exp_sp_peer = LUSTRE_SP_ANY;
722         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
723         export->exp_client_uuid = *cluuid;
724         obd_init_export(export);
725
726         spin_lock(&obd->obd_dev_lock);
727         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
728                rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid, 
729                                                &export->exp_uuid_hash);
730                if (rc != 0) {
731                        CWARN("%s: denying duplicate export for %s\n",
732                              obd->obd_name, cluuid->uuid);
733                        spin_unlock(&obd->obd_dev_lock);
734                        class_handle_unhash(&export->exp_handle);
735                        OBD_FREE_PTR(export);
736                        return ERR_PTR(-EALREADY);
737                }
738         }
739
740         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
741         class_incref(obd);
742         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
743         list_add_tail(&export->exp_obd_chain_timed,
744                       &export->exp_obd->obd_exports_timed);
745         export->exp_obd->obd_num_exports++;
746         spin_unlock(&obd->obd_dev_lock);
747
748         return export;
749 }
750 EXPORT_SYMBOL(class_new_export);
751
752 void class_unlink_export(struct obd_export *exp)
753 {
754         class_handle_unhash(&exp->exp_handle);
755
756         spin_lock(&exp->exp_obd->obd_dev_lock);
757         /* delete an uuid-export hashitem from hashtables */
758         if (!hlist_unhashed(&exp->exp_uuid_hash)) {
759                 lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body, 
760                                     &exp->exp_client_uuid, &exp->exp_uuid_hash);
761         }
762         list_del_init(&exp->exp_obd_chain);
763         list_del_init(&exp->exp_obd_chain_timed);
764         exp->exp_obd->obd_num_exports--;
765         spin_unlock(&exp->exp_obd->obd_dev_lock);
766
767         class_export_put(exp);
768 }
769 EXPORT_SYMBOL(class_unlink_export);
770
771 /* Import management functions */
772 static void import_handle_addref(void *import)
773 {
774         class_import_get(import);
775 }
776
777 struct obd_import *class_import_get(struct obd_import *import)
778 {
779         LASSERT(atomic_read(&import->imp_refcount) >= 0);
780         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
781         atomic_inc(&import->imp_refcount);
782         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
783                atomic_read(&import->imp_refcount));
784         return import;
785 }
786 EXPORT_SYMBOL(class_import_get);
787
788 void class_import_put(struct obd_import *import)
789 {
790         ENTRY;
791
792         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
793                atomic_read(&import->imp_refcount) - 1);
794
795         LASSERT(atomic_read(&import->imp_refcount) > 0);
796         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
797         LASSERT(list_empty(&import->imp_zombie_chain));
798
799         if (atomic_dec_and_test(&import->imp_refcount)) {
800
801                 CDEBUG(D_INFO, "final put import %p\n", import);
802                 
803                 spin_lock(&obd_zombie_impexp_lock);
804                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
805                 spin_unlock(&obd_zombie_impexp_lock);
806
807                 if (obd_zombie_impexp_notify != NULL)
808                         obd_zombie_impexp_notify();
809         }
810
811         EXIT;
812 }
813 EXPORT_SYMBOL(class_import_put);
814
815 void class_import_destroy(struct obd_import *import)
816 {
817         ENTRY;
818         
819         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
820                 import->imp_obd->obd_name);
821
822         LASSERT(atomic_read(&import->imp_refcount) == 0);
823
824         ptlrpc_put_connection_superhack(import->imp_connection);
825
826         while (!list_empty(&import->imp_conn_list)) {
827                 struct obd_import_conn *imp_conn;
828
829                 imp_conn = list_entry(import->imp_conn_list.next,
830                                       struct obd_import_conn, oic_item);
831                 list_del(&imp_conn->oic_item);
832                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
833                 OBD_FREE(imp_conn, sizeof(*imp_conn));
834         }
835
836         LASSERT(import->imp_sec == NULL);
837         class_decref(import->imp_obd);
838         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
839         EXIT;
840 }
841
842 static void init_imp_at(struct imp_at *at) {
843         int i;
844         at_init(&at->iat_net_latency, 0, 0);
845         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
846                 /* max service estimates are tracked on the server side, so
847                    don't use the AT history here, just use the last reported
848                    val. (But keep hist for proc histogram, worst_ever) */
849                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
850                         AT_FLG_NOHIST);
851         }
852 }
853
854 struct obd_import *class_new_import(struct obd_device *obd)
855 {
856         struct obd_import *imp;
857
858         OBD_ALLOC(imp, sizeof(*imp));
859         if (imp == NULL)
860                 return NULL;
861
862         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
863         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
864         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
865         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
866         spin_lock_init(&imp->imp_lock);
867         imp->imp_last_success_conn = 0;
868         imp->imp_state = LUSTRE_IMP_NEW;
869         imp->imp_obd = class_incref(obd);
870         sema_init(&imp->imp_sec_mutex, 1);
871         cfs_waitq_init(&imp->imp_recovery_waitq);
872
873         atomic_set(&imp->imp_refcount, 2);
874         atomic_set(&imp->imp_inflight, 0);
875         atomic_set(&imp->imp_replay_inflight, 0);
876         atomic_set(&imp->imp_inval_count, 0);
877         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
878         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
879         class_handle_hash(&imp->imp_handle, import_handle_addref);
880         init_imp_at(&imp->imp_at);
881
882         /* the default magic is V2, will be used in connect RPC, and
883          * then adjusted according to the flags in request/reply. */
884         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
885
886         return imp;
887 }
888 EXPORT_SYMBOL(class_new_import);
889
890 void class_destroy_import(struct obd_import *import)
891 {
892         LASSERT(import != NULL);
893         LASSERT(import != LP_POISON);
894
895         class_handle_unhash(&import->imp_handle);
896
897         spin_lock(&import->imp_lock);
898         import->imp_generation++;
899         spin_unlock(&import->imp_lock);
900         class_import_put(import);
901 }
902 EXPORT_SYMBOL(class_destroy_import);
903
904 /* A connection defines an export context in which preallocation can
905    be managed. This releases the export pointer reference, and returns
906    the export handle, so the export refcount is 1 when this function
907    returns. */
908 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
909                   struct obd_uuid *cluuid)
910 {
911         struct obd_export *export;
912         LASSERT(conn != NULL);
913         LASSERT(obd != NULL);
914         LASSERT(cluuid != NULL);
915         ENTRY;
916
917         export = class_new_export(obd, cluuid);
918         if (IS_ERR(export))
919                 RETURN(PTR_ERR(export));
920
921         conn->cookie = export->exp_handle.h_cookie;
922         class_export_put(export);
923
924         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
925                cluuid->uuid, conn->cookie);
926         RETURN(0);
927 }
928 EXPORT_SYMBOL(class_connect);
929
930 /* if export is involved in recovery then clean up related things */
931 void class_export_recovery_cleanup(struct obd_export *exp)
932 {
933         struct obd_device *obd = exp->exp_obd;
934
935         spin_lock_bh(&obd->obd_processing_task_lock);
936         if (obd->obd_recovering && exp->exp_in_recovery) {
937                 spin_lock(&exp->exp_lock);
938                 exp->exp_in_recovery = 0;
939                 spin_unlock(&exp->exp_lock);
940                 obd->obd_connected_clients--;
941                 /* each connected client is counted as recoverable */
942                 obd->obd_recoverable_clients--;
943                 if (exp->exp_req_replay_needed) {
944                         spin_lock(&exp->exp_lock);
945                         exp->exp_req_replay_needed = 0;
946                         spin_unlock(&exp->exp_lock);
947                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
948                         atomic_dec(&obd->obd_req_replay_clients);
949                 }
950                 if (exp->exp_lock_replay_needed) {
951                         spin_lock(&exp->exp_lock);
952                         exp->exp_lock_replay_needed = 0;
953                         spin_unlock(&exp->exp_lock);
954                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
955                         atomic_dec(&obd->obd_lock_replay_clients);                
956                 }
957         }
958         spin_unlock_bh(&obd->obd_processing_task_lock);
959 }
960
961 /* This function removes two references from the export: one for the
962  * hash entry and one for the export pointer passed in.  The export
963  * pointer passed to this function is destroyed should not be used
964  * again. */
965 int class_disconnect(struct obd_export *export)
966 {
967         int already_disconnected;
968         ENTRY;
969
970         if (export == NULL) {
971                 fixme();
972                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
973                 RETURN(-EINVAL);
974         }
975
976         spin_lock(&export->exp_lock);
977         already_disconnected = export->exp_disconnected;
978         export->exp_disconnected = 1;
979
980         if (!hlist_unhashed(&export->exp_nid_hash)) {
981                 lustre_hash_delitem(export->exp_obd->obd_nid_hash_body,
982                                     &export->exp_connection->c_peer.nid, &export->exp_nid_hash);
983         }
984         spin_unlock(&export->exp_lock);
985
986         /* class_cleanup(), abort_recovery(), and class_fail_export()
987          * all end up in here, and if any of them race we shouldn't
988          * call extra class_export_puts(). */
989         if (already_disconnected)
990                 RETURN(0);
991
992         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
993                export->exp_handle.h_cookie);
994
995         class_export_recovery_cleanup(export);
996         class_unlink_export(export);
997         class_export_put(export);
998         RETURN(0);
999 }
1000
1001 static void class_disconnect_export_list(struct list_head *list, int flags)
1002 {
1003         int rc;
1004         struct lustre_handle fake_conn;
1005         struct obd_export *fake_exp, *exp;
1006         ENTRY;
1007
1008         /* It's possible that an export may disconnect itself, but
1009          * nothing else will be added to this list. */
1010         while (!list_empty(list)) {
1011                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1012                 class_export_get(exp);
1013
1014                 spin_lock(&exp->exp_lock);
1015                 exp->exp_flags = flags;
1016                 spin_unlock(&exp->exp_lock);
1017
1018                 if (obd_uuid_equals(&exp->exp_client_uuid,
1019                                     &exp->exp_obd->obd_uuid)) {
1020                         CDEBUG(D_HA,
1021                                "exp %p export uuid == obd uuid, don't discon\n",
1022                                exp);
1023                         /* Need to delete this now so we don't end up pointing
1024                          * to work_list later when this export is cleaned up. */
1025                         list_del_init(&exp->exp_obd_chain);
1026                         class_export_put(exp);
1027                         continue;
1028                 }
1029
1030                 fake_conn.cookie = exp->exp_handle.h_cookie;
1031                 fake_exp = class_conn2export(&fake_conn);
1032                 if (!fake_exp) {
1033                         class_export_put(exp);
1034                         continue;
1035                 }
1036
1037                 spin_lock(&fake_exp->exp_lock);
1038                 fake_exp->exp_flags = flags;
1039                 spin_unlock(&fake_exp->exp_lock);
1040
1041                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1042                        "last request at %ld\n",
1043                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1044                        exp, exp->exp_last_request_time);
1045                 rc = obd_disconnect(fake_exp);
1046                 class_export_put(exp);
1047         }
1048         EXIT;
1049 }
1050
1051 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1052 {
1053         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1054                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1055 }
1056
1057 void class_disconnect_exports(struct obd_device *obd)
1058 {
1059         struct list_head work_list;
1060         ENTRY;
1061
1062         /* Move all of the exports from obd_exports to a work list, en masse. */
1063         spin_lock(&obd->obd_dev_lock);
1064         list_add(&work_list, &obd->obd_exports);
1065         list_del_init(&obd->obd_exports);
1066         spin_unlock(&obd->obd_dev_lock);
1067         
1068         if (!list_empty(&work_list)) {
1069                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1070                        "disconnecting them\n", obd->obd_minor, obd);
1071                 class_disconnect_export_list(&work_list, 
1072                                              get_exp_flags_from_obd(obd));
1073         } else
1074                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1075                        obd->obd_minor, obd);
1076         EXIT;
1077 }
1078 EXPORT_SYMBOL(class_disconnect_exports);
1079
1080 /* Remove exports that have not completed recovery.
1081  */
1082 int class_disconnect_stale_exports(struct obd_device *obd,
1083                                    int (*test_export)(struct obd_export *))
1084 {
1085         struct list_head work_list;
1086         struct list_head *pos, *n;
1087         struct obd_export *exp;
1088         int cnt = 0;
1089         ENTRY;
1090
1091         CFS_INIT_LIST_HEAD(&work_list);
1092         spin_lock(&obd->obd_dev_lock);
1093         list_for_each_safe(pos, n, &obd->obd_exports) {
1094                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1095                 if (test_export(exp))
1096                         continue;
1097                 
1098                 list_del(&exp->exp_obd_chain);
1099                 list_add(&exp->exp_obd_chain, &work_list);
1100                 /* don't count self-export as client */
1101                 if (obd_uuid_equals(&exp->exp_client_uuid,
1102                                      &exp->exp_obd->obd_uuid))
1103                         continue;
1104
1105                 cnt++;
1106                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1107                        obd->obd_name, exp->exp_client_uuid.uuid,
1108                        exp->exp_connection == NULL ? "<unknown>" :
1109                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1110         }
1111         spin_unlock(&obd->obd_dev_lock);
1112
1113         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1114                obd->obd_name, cnt);
1115         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1116         RETURN(cnt);
1117 }
1118 EXPORT_SYMBOL(class_disconnect_stale_exports);
1119
1120 int oig_init(struct obd_io_group **oig_out)
1121 {
1122         struct obd_io_group *oig;
1123         ENTRY;
1124
1125         OBD_ALLOC(oig, sizeof(*oig));
1126         if (oig == NULL)
1127                 RETURN(-ENOMEM);
1128
1129         spin_lock_init(&oig->oig_lock);
1130         oig->oig_rc = 0;
1131         oig->oig_pending = 0;
1132         atomic_set(&oig->oig_refcount, 1);
1133         cfs_waitq_init(&oig->oig_waitq);
1134         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1135
1136         *oig_out = oig;
1137         RETURN(0);
1138 };
1139 EXPORT_SYMBOL(oig_init);
1140
1141 static inline void oig_grab(struct obd_io_group *oig)
1142 {
1143         atomic_inc(&oig->oig_refcount);
1144 }
1145
1146 void oig_release(struct obd_io_group *oig)
1147 {
1148         if (atomic_dec_and_test(&oig->oig_refcount))
1149                 OBD_FREE(oig, sizeof(*oig));
1150 }
1151 EXPORT_SYMBOL(oig_release);
1152
1153 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1154 {
1155         int rc = 0;
1156         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1157         spin_lock(&oig->oig_lock);
1158         if (oig->oig_rc) {
1159                 rc = oig->oig_rc;
1160         } else {
1161                 oig->oig_pending++;
1162                 if (occ != NULL)
1163                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1164         }
1165         spin_unlock(&oig->oig_lock);
1166         oig_grab(oig);
1167
1168         return rc;
1169 }
1170 EXPORT_SYMBOL(oig_add_one);
1171
1172 void oig_complete_one(struct obd_io_group *oig,
1173                       struct oig_callback_context *occ, int rc)
1174 {
1175         cfs_waitq_t *wake = NULL;
1176         int old_rc;
1177
1178         spin_lock(&oig->oig_lock);
1179
1180         if (occ != NULL)
1181                 list_del_init(&occ->occ_oig_item);
1182
1183         old_rc = oig->oig_rc;
1184         if (oig->oig_rc == 0 && rc != 0)
1185                 oig->oig_rc = rc;
1186
1187         if (--oig->oig_pending <= 0)
1188                 wake = &oig->oig_waitq;
1189
1190         spin_unlock(&oig->oig_lock);
1191
1192         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1193                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1194                         oig->oig_pending);
1195         if (wake)
1196                 cfs_waitq_signal(wake);
1197         oig_release(oig);
1198 }
1199 EXPORT_SYMBOL(oig_complete_one);
1200
1201 static int oig_done(struct obd_io_group *oig)
1202 {
1203         int rc = 0;
1204         spin_lock(&oig->oig_lock);
1205         if (oig->oig_pending <= 0)
1206                 rc = 1;
1207         spin_unlock(&oig->oig_lock);
1208         return rc;
1209 }
1210
1211 static void interrupted_oig(void *data)
1212 {
1213         struct obd_io_group *oig = data;
1214         struct oig_callback_context *occ;
1215
1216         spin_lock(&oig->oig_lock);
1217         /* We need to restart the processing each time we drop the lock, as
1218          * it is possible other threads called oig_complete_one() to remove
1219          * an entry elsewhere in the list while we dropped lock.  We need to
1220          * drop the lock because osc_ap_completion() calls oig_complete_one()
1221          * which re-gets this lock ;-) as well as a lock ordering issue. */
1222 restart:
1223         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1224                 if (occ->interrupted)
1225                         continue;
1226                 occ->interrupted = 1;
1227                 spin_unlock(&oig->oig_lock);
1228                 occ->occ_interrupted(occ);
1229                 spin_lock(&oig->oig_lock);
1230                 goto restart;
1231         }
1232         spin_unlock(&oig->oig_lock);
1233 }
1234
1235 int oig_wait(struct obd_io_group *oig)
1236 {
1237         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1238         int rc;
1239
1240         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1241
1242         do {
1243                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1244                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1245                 /* we can't continue until the oig has emptied and stopped
1246                  * referencing state that the caller will free upon return */
1247                 if (rc == -EINTR)
1248                         lwi = (struct l_wait_info){ 0, };
1249         } while (rc == -EINTR);
1250
1251         LASSERTF(oig->oig_pending == 0,
1252                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1253                  oig->oig_pending);
1254
1255         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1256         return oig->oig_rc;
1257 }
1258 EXPORT_SYMBOL(oig_wait);
1259
1260 void class_fail_export(struct obd_export *exp)
1261 {
1262         int rc, already_failed;
1263
1264         spin_lock(&exp->exp_lock);
1265         already_failed = exp->exp_failed;
1266         exp->exp_failed = 1;
1267         spin_unlock(&exp->exp_lock);
1268
1269         if (already_failed) {
1270                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1271                        exp, exp->exp_client_uuid.uuid);
1272                 return;
1273         }
1274
1275         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1276                exp, exp->exp_client_uuid.uuid);
1277
1278         if (obd_dump_on_timeout)
1279                 libcfs_debug_dumplog();
1280
1281         /* Most callers into obd_disconnect are removing their own reference
1282          * (request, for example) in addition to the one from the hash table.
1283          * We don't have such a reference here, so make one. */
1284         class_export_get(exp);
1285         rc = obd_disconnect(exp);
1286         if (rc)
1287                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1288         else
1289                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1290                        exp, exp->exp_client_uuid.uuid);
1291 }
1292 EXPORT_SYMBOL(class_fail_export);
1293
1294 char *obd_export_nid2str(struct obd_export *exp)
1295 {
1296         if (exp->exp_connection != NULL)
1297                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1298
1299         return "(no nid)";
1300 }
1301 EXPORT_SYMBOL(obd_export_nid2str);
1302
1303 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1304 {
1305         struct obd_export *doomed_exp = NULL;
1306         int exports_evicted = 0;
1307
1308         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1309
1310         do {
1311                 doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
1312                                                            &nid_key);
1313                 if (doomed_exp == NULL)
1314                         break;
1315
1316                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1317                          "nid %s found, wanted nid %s, requested nid %s\n",
1318                          obd_export_nid2str(doomed_exp),
1319                          libcfs_nid2str(nid_key), nid);        
1320                 LASSERTF(doomed_exp != obd->obd_self_export,
1321                          "self-export is hashed by NID?\n");
1322                 exports_evicted++;
1323                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1324                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1325                        exports_evicted);
1326                 class_fail_export(doomed_exp);
1327                 class_export_put(doomed_exp);
1328         } while (1);
1329
1330         if (!exports_evicted)
1331                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1332                        obd->obd_name, nid);
1333         return exports_evicted;
1334 }
1335 EXPORT_SYMBOL(obd_export_evict_by_nid);
1336
1337 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1338 {
1339         struct obd_export *doomed_exp = NULL;
1340         struct obd_uuid doomed;
1341         int exports_evicted = 0;
1342
1343         obd_str2uuid(&doomed, uuid);
1344         if (obd_uuid_equals(&doomed, &obd->obd_uuid)) {
1345                 CERROR("%s: can't evict myself\n", obd->obd_name);
1346                 return exports_evicted;
1347         }
1348
1349         doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body, 
1350                                                    &doomed);
1351
1352         if (doomed_exp == NULL) {
1353                 CERROR("%s: can't disconnect %s: no exports found\n",
1354                        obd->obd_name, uuid);
1355         } else {
1356                 CWARN("%s: evicting %s at adminstrative request\n",
1357                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1358                 class_fail_export(doomed_exp);
1359                 class_export_put(doomed_exp);
1360                 exports_evicted++;
1361         }
1362
1363         return exports_evicted;
1364 }
1365 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1366
1367 /**
1368  * kill zombie imports and exports
1369  */
1370 void obd_zombie_impexp_cull(void)
1371 {
1372         struct obd_import *import;
1373         struct obd_export *export;
1374         ENTRY;
1375
1376         do {
1377                 spin_lock (&obd_zombie_impexp_lock);
1378
1379                 import = NULL;
1380                 if (!list_empty(&obd_zombie_imports)) {
1381                         import = list_entry(obd_zombie_imports.next,
1382                                             struct obd_import,
1383                                             imp_zombie_chain);
1384                         list_del(&import->imp_zombie_chain);
1385                 }
1386
1387                 export = NULL;
1388                 if (!list_empty(&obd_zombie_exports)) {
1389                         export = list_entry(obd_zombie_exports.next,
1390                                             struct obd_export,
1391                                             exp_obd_chain);
1392                         list_del_init(&export->exp_obd_chain);
1393                 }
1394
1395                 spin_unlock(&obd_zombie_impexp_lock);
1396
1397                 if (import != NULL)
1398                         class_import_destroy(import);
1399
1400                 if (export != NULL)
1401                         class_export_destroy(export);
1402
1403         } while (import != NULL || export != NULL);
1404         EXIT;
1405 }
1406
1407 static struct completion        obd_zombie_start;
1408 static struct completion        obd_zombie_stop;
1409 static unsigned long            obd_zombie_flags;
1410 static cfs_waitq_t              obd_zombie_waitq;
1411
1412 enum {
1413         OBD_ZOMBIE_STOP = 1
1414 };
1415
1416 /**
1417  * check for work for kill zombie import/export thread.
1418  */
1419 int obd_zombie_impexp_check(void *arg)
1420 {
1421         int rc;
1422
1423         spin_lock(&obd_zombie_impexp_lock);
1424         rc = list_empty(&obd_zombie_imports) &&
1425              list_empty(&obd_zombie_exports) &&
1426              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1427
1428         spin_unlock(&obd_zombie_impexp_lock);
1429
1430         RETURN(rc);
1431 }
1432
1433 /**
1434  * notify import/export destroy thread about new zombie.
1435  */
1436 static void obd_zombie_impexp_notify(void)
1437 {
1438         cfs_waitq_signal(&obd_zombie_waitq);
1439 }
1440
1441 #ifdef __KERNEL__
1442
1443 /**
1444  * destroy zombie export/import thread.
1445  */
1446 static int obd_zombie_impexp_thread(void *unused)
1447 {
1448         int rc;
1449
1450         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1451                 complete(&obd_zombie_start);
1452                 RETURN(rc);
1453         }
1454
1455         complete(&obd_zombie_start);
1456
1457         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1458                 struct l_wait_info lwi = { 0 };
1459
1460                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1461
1462                 obd_zombie_impexp_cull();
1463         }
1464
1465         complete(&obd_zombie_stop);
1466
1467         RETURN(0);
1468 }
1469
1470 #else /* ! KERNEL */
1471
1472 static atomic_t zombie_recur = ATOMIC_INIT(0);
1473 static void *obd_zombie_impexp_work_cb;
1474 static void *obd_zombie_impexp_idle_cb;
1475
1476 int obd_zombie_impexp_kill(void *arg)
1477 {
1478         int rc = 0;
1479
1480         if (atomic_inc_return(&zombie_recur) == 1) {
1481                 obd_zombie_impexp_cull();
1482                 rc = 1;
1483         }
1484         atomic_dec(&zombie_recur);
1485         return rc;
1486 }
1487
1488 #endif
1489
1490 /**
1491  * start destroy zombie import/export thread
1492  */
1493 int obd_zombie_impexp_init(void)
1494 {
1495         int rc;
1496
1497         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1498         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1499         spin_lock_init(&obd_zombie_impexp_lock);
1500         init_completion(&obd_zombie_start);
1501         init_completion(&obd_zombie_stop);
1502         cfs_waitq_init(&obd_zombie_waitq);
1503
1504 #ifdef __KERNEL__
1505         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1506         if (rc < 0)
1507                 RETURN(rc);
1508
1509         wait_for_completion(&obd_zombie_start);
1510 #else
1511
1512         obd_zombie_impexp_work_cb =
1513                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1514                                                  &obd_zombie_impexp_kill, NULL);
1515
1516         obd_zombie_impexp_idle_cb =
1517                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1518                                                  &obd_zombie_impexp_check, NULL);
1519         rc = 0;
1520
1521 #endif
1522         RETURN(rc);
1523 }
1524 /**
1525  * stop destroy zombie import/export thread
1526  */
1527 void obd_zombie_impexp_stop(void)
1528 {
1529         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1530         obd_zombie_impexp_notify();
1531 #ifdef __KERNEL__
1532         wait_for_completion(&obd_zombie_stop);
1533 #else
1534         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1535         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1536 #endif
1537 }
1538