Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n", 
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n", 
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94 EXPORT_SYMBOL(obd_device_free);
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops, 
150                         struct lprocfs_vars *vars, const char *name, 
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173         
174         if (type->typ_dt_ops == NULL || 
175             type->typ_md_ops == NULL || 
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = ldt->ldt_ops->ldto_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 struct obd_device *class_newdev(const char *type_name, const char *name)
258 {
259         struct obd_device *result = NULL;
260         struct obd_device *newdev;
261         struct obd_type *type = NULL;
262         int i;
263         int new_obd_minor = 0;
264
265         if (strlen(name) > MAX_OBD_NAME) {
266                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
267                 RETURN(ERR_PTR(-EINVAL));
268         }
269
270         type = class_get_type(type_name); 
271         if (type == NULL){
272                 CERROR("OBD: unknown type: %s\n", type_name);
273                 RETURN(ERR_PTR(-ENODEV));
274         }
275
276         newdev = obd_device_alloc();
277         if (newdev == NULL) { 
278                 class_put_type(type);
279                 RETURN(ERR_PTR(-ENOMEM));
280         }
281         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
282
283         spin_lock(&obd_dev_lock);
284         for (i = 0; i < class_devno_max(); i++) {
285                 struct obd_device *obd = class_num2obd(i);
286                 if (obd && obd->obd_name &&
287                     (strcmp(name, obd->obd_name) == 0)) {
288                         CERROR("Device %s already exists, won't add\n", name);
289                         if (result) {
290                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
291                                          "%p obd_magic %08x != %08x\n", result,
292                                          result->obd_magic, OBD_DEVICE_MAGIC);
293                                 LASSERTF(result->obd_minor == new_obd_minor,
294                                          "%p obd_minor %d != %d\n", result,
295                                          result->obd_minor, new_obd_minor);
296
297                                 obd_devs[result->obd_minor] = NULL;
298                                 result->obd_name[0]='\0';
299                          }
300                         result = ERR_PTR(-EEXIST);
301                         break;
302                 }
303                 if (!result && !obd) {
304                         result = newdev;
305                         result->obd_minor = i;
306                         new_obd_minor = i;
307                         result->obd_type = type;
308                         memcpy(result->obd_name, name, strlen(name));
309                         obd_devs[i] = result;
310                 }
311         }
312         spin_unlock(&obd_dev_lock);
313         
314         if (result == NULL && i >= class_devno_max()) {
315                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
316                        class_devno_max());
317                 result = ERR_PTR(-EOVERFLOW);
318         }
319         
320         if (IS_ERR(result)) {
321                 obd_device_free(newdev);
322                 class_put_type(type);
323         } else {
324                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
325                        result->obd_name, result);
326         }
327         return result;
328 }
329
330 void class_release_dev(struct obd_device *obd)
331 {
332         struct obd_type *obd_type = obd->obd_type;
333
334         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
335                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
336         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
337                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
338         LASSERT(obd_type != NULL);
339
340         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
341                obd->obd_name,obd->obd_type->typ_name);
342
343         spin_lock(&obd_dev_lock);
344         obd_devs[obd->obd_minor] = NULL;
345         spin_unlock(&obd_dev_lock);
346         obd_device_free(obd);
347
348         class_put_type(obd_type);
349 }
350
351 int class_name2dev(const char *name)
352 {
353         int i;
354
355         if (!name)
356                 return -1;
357
358         spin_lock(&obd_dev_lock);
359         for (i = 0; i < class_devno_max(); i++) {
360                 struct obd_device *obd = class_num2obd(i);
361                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
362                         /* Make sure we finished attaching before we give
363                            out any references */
364                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
365                         if (obd->obd_attached) {
366                                 spin_unlock(&obd_dev_lock);
367                                 return i;
368                         }
369                         break;
370                 }
371         }
372         spin_unlock(&obd_dev_lock);
373
374         return -1;
375 }
376
377 struct obd_device *class_name2obd(const char *name)
378 {
379         int dev = class_name2dev(name);
380
381         if (dev < 0 || dev > class_devno_max())
382                 return NULL;
383         return class_num2obd(dev);
384 }
385
386 int class_uuid2dev(struct obd_uuid *uuid)
387 {
388         int i;
389
390         spin_lock(&obd_dev_lock);
391         for (i = 0; i < class_devno_max(); i++) {
392                 struct obd_device *obd = class_num2obd(i);
393                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
394                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
395                         spin_unlock(&obd_dev_lock);
396                         return i;
397                 }
398         }
399         spin_unlock(&obd_dev_lock);
400
401         return -1;
402 }
403
404 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
405 {
406         int dev = class_uuid2dev(uuid);
407         if (dev < 0)
408                 return NULL;
409         return class_num2obd(dev);
410 }
411
412 struct obd_device *class_num2obd(int num)
413 {
414         struct obd_device *obd = NULL;
415
416         if (num < class_devno_max()) {
417                 obd = obd_devs[num];
418                 if (obd == NULL) {
419                         return NULL;
420                 }
421
422                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
423                          "%p obd_magic %08x != %08x\n",
424                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
425                 LASSERTF(obd->obd_minor == num,
426                          "%p obd_minor %0d != %0d\n",
427                          obd, obd->obd_minor, num);
428         }
429
430         return obd;
431 }
432
433 void class_obd_list(void)
434 {
435         char *status;
436         int i;
437
438         spin_lock(&obd_dev_lock);
439         for (i = 0; i < class_devno_max(); i++) {
440                 struct obd_device *obd = class_num2obd(i);
441                 if (obd == NULL)
442                         continue;
443                 if (obd->obd_stopping)
444                         status = "ST";
445                 else if (obd->obd_set_up)
446                         status = "UP";
447                 else if (obd->obd_attached)
448                         status = "AT";
449                 else
450                         status = "--";
451                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
452                          i, status, obd->obd_type->typ_name,
453                          obd->obd_name, obd->obd_uuid.uuid,
454                          atomic_read(&obd->obd_refcount));
455         }
456         spin_unlock(&obd_dev_lock);
457         return;
458 }
459
460 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
461    specified, then only the client with that uuid is returned,
462    otherwise any client connected to the tgt is returned. */
463 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
464                                           const char * typ_name,
465                                           struct obd_uuid *grp_uuid)
466 {
467         int i;
468
469         spin_lock(&obd_dev_lock);
470         for (i = 0; i < class_devno_max(); i++) {
471                 struct obd_device *obd = class_num2obd(i);
472                 if (obd == NULL)
473                         continue;
474                 if ((strncmp(obd->obd_type->typ_name, typ_name,
475                              strlen(typ_name)) == 0)) {
476                         if (obd_uuid_equals(tgt_uuid,
477                                             &obd->u.cli.cl_target_uuid) &&
478                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
479                                                          &obd->obd_uuid) : 1)) {
480                                 spin_unlock(&obd_dev_lock);
481                                 return obd;
482                         }
483                 }
484         }
485         spin_unlock(&obd_dev_lock);
486
487         return NULL;
488 }
489
490 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
491                                             struct obd_uuid *grp_uuid)
492 {
493         struct obd_device *obd;
494
495         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
496         if (!obd)
497                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
498                                             grp_uuid);
499         return obd;
500 }
501
502 /* Iterate the obd_device list looking devices have grp_uuid. Start
503    searching at *next, and if a device is found, the next index to look
504    at is saved in *next. If next is NULL, then the first matching device
505    will always be returned. */
506 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
507 {
508         int i;
509
510         if (next == NULL)
511                 i = 0;
512         else if (*next >= 0 && *next < class_devno_max())
513                 i = *next;
514         else
515                 return NULL;
516
517         spin_lock(&obd_dev_lock);
518         for (; i < class_devno_max(); i++) {
519                 struct obd_device *obd = class_num2obd(i);
520                 if (obd == NULL)
521                         continue;
522                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
523                         if (next != NULL)
524                                 *next = i+1;
525                         spin_unlock(&obd_dev_lock);
526                         return obd;
527                 }
528         }
529         spin_unlock(&obd_dev_lock);
530
531         return NULL;
532 }
533
534
535 void obd_cleanup_caches(void)
536 {
537         int rc;
538
539         ENTRY;
540         if (obd_device_cachep) {
541                 rc = cfs_mem_cache_destroy(obd_device_cachep);
542                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
543                 obd_device_cachep = NULL;
544         }
545         if (obdo_cachep) {
546                 rc = cfs_mem_cache_destroy(obdo_cachep);
547                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
548                 obdo_cachep = NULL;
549         }
550         if (import_cachep) {
551                 rc = cfs_mem_cache_destroy(import_cachep);
552                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
553                 import_cachep = NULL;
554         }
555         if (capa_cachep) {
556                 rc = cfs_mem_cache_destroy(capa_cachep);
557                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
558                 capa_cachep = NULL;
559         }
560         EXIT;
561 }
562
563 int obd_init_caches(void)
564 {
565         ENTRY;
566
567         LASSERT(obd_device_cachep == NULL);
568         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
569                                                  sizeof(struct obd_device), 
570                                                  0, 0);
571         if (!obd_device_cachep)
572                 GOTO(out, -ENOMEM);
573
574         LASSERT(obdo_cachep == NULL);
575         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
576                                            0, 0);
577         if (!obdo_cachep)
578                 GOTO(out, -ENOMEM);
579
580         LASSERT(import_cachep == NULL);
581         import_cachep = cfs_mem_cache_create("ll_import_cache",
582                                              sizeof(struct obd_import),
583                                              0, 0);
584         if (!import_cachep)
585                 GOTO(out, -ENOMEM);
586
587         LASSERT(capa_cachep == NULL);
588         capa_cachep = cfs_mem_cache_create("capa_cache",
589                                            sizeof(struct obd_capa), 0, 0);
590         if (!capa_cachep)
591                 GOTO(out, -ENOMEM);
592
593         RETURN(0);
594  out:
595         obd_cleanup_caches();
596         RETURN(-ENOMEM);
597
598 }
599
600 /* map connection to client */
601 struct obd_export *class_conn2export(struct lustre_handle *conn)
602 {
603         struct obd_export *export;
604         ENTRY;
605
606         if (!conn) {
607                 CDEBUG(D_CACHE, "looking for null handle\n");
608                 RETURN(NULL);
609         }
610
611         if (conn->cookie == -1) {  /* this means assign a new connection */
612                 CDEBUG(D_CACHE, "want a new connection\n");
613                 RETURN(NULL);
614         }
615
616         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
617         export = class_handle2object(conn->cookie);
618         RETURN(export);
619 }
620
621 struct obd_device *class_exp2obd(struct obd_export *exp)
622 {
623         if (exp)
624                 return exp->exp_obd;
625         return NULL;
626 }
627
628 struct obd_device *class_conn2obd(struct lustre_handle *conn)
629 {
630         struct obd_export *export;
631         export = class_conn2export(conn);
632         if (export) {
633                 struct obd_device *obd = export->exp_obd;
634                 class_export_put(export);
635                 return obd;
636         }
637         return NULL;
638 }
639
640 struct obd_import *class_exp2cliimp(struct obd_export *exp)
641 {
642         struct obd_device *obd = exp->exp_obd;
643         if (obd == NULL)
644                 return NULL;
645         return obd->u.cli.cl_import;
646 }
647
648 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
649 {
650         struct obd_device *obd = class_conn2obd(conn);
651         if (obd == NULL)
652                 return NULL;
653         return obd->u.cli.cl_import;
654 }
655
656 /* Export management functions */
657 static void export_handle_addref(void *export)
658 {
659         class_export_get(export);
660 }
661
662 void __class_export_put(struct obd_export *exp)
663 {
664         if (atomic_dec_and_test(&exp->exp_refcount)) {
665                 LASSERT (list_empty(&exp->exp_obd_chain));
666
667                 CDEBUG(D_IOCTL, "final put %p/%s\n",
668                        exp, exp->exp_client_uuid.uuid);
669         
670                 spin_lock(&obd_zombie_impexp_lock);
671                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
672                 spin_unlock(&obd_zombie_impexp_lock);
673
674                 if (obd_zombie_impexp_notify != NULL)
675                         obd_zombie_impexp_notify();
676         }
677 }
678 EXPORT_SYMBOL(__class_export_put);
679
680 void class_export_destroy(struct obd_export *exp)
681 {
682         struct obd_device *obd = exp->exp_obd;
683         ENTRY;
684
685         LASSERT (atomic_read(&exp->exp_refcount) == 0);
686
687         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
688                exp->exp_client_uuid.uuid, obd->obd_name);
689
690         LASSERT(obd != NULL);
691
692         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
693         if (exp->exp_connection)
694                 ptlrpc_put_connection_superhack(exp->exp_connection);
695
696         LASSERT(list_empty(&exp->exp_outstanding_replies));
697         LASSERT(list_empty(&exp->exp_req_replay_queue));
698         obd_destroy_export(exp);
699  
700         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
701         class_decref(obd);
702         EXIT;
703 }
704
705 /* Creates a new export, adds it to the hash table, and returns a
706  * pointer to it. The refcount is 2: one for the hash reference, and
707  * one for the pointer returned by this function. */
708 struct obd_export *class_new_export(struct obd_device *obd,
709                                     struct obd_uuid *cluuid)
710 {
711         struct obd_export *export;
712         int rc = 0;
713
714         OBD_ALLOC_PTR(export);
715         if (!export)
716                 return ERR_PTR(-ENOMEM);
717
718         export->exp_conn_cnt = 0;
719         atomic_set(&export->exp_refcount, 2);
720         atomic_set(&export->exp_rpc_count, 0);
721         export->exp_obd = obd;
722         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
723         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
724         /* XXX this should be in LDLM init */
725         CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
726         spin_lock_init(&export->exp_ldlm_data.led_lock);
727
728         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
729         class_handle_hash(&export->exp_handle, export_handle_addref);
730         export->exp_last_request_time = cfs_time_current_sec();
731         spin_lock_init(&export->exp_lock);
732         INIT_HLIST_NODE(&export->exp_uuid_hash);
733         INIT_HLIST_NODE(&export->exp_nid_hash);
734
735         export->exp_sp_peer = LUSTRE_SP_ANY;
736         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
737         export->exp_client_uuid = *cluuid;
738         obd_init_export(export);
739
740         spin_lock(&obd->obd_dev_lock);
741         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
742                rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid, 
743                                                &export->exp_uuid_hash);
744                if (rc != 0) {
745                        CWARN("%s: denying duplicate export for %s\n",
746                              obd->obd_name, cluuid->uuid);
747                        spin_unlock(&obd->obd_dev_lock);
748                        class_handle_unhash(&export->exp_handle);
749                        OBD_FREE_PTR(export);
750                        return ERR_PTR(-EALREADY);
751                }
752         }
753
754         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
755         class_incref(obd);
756         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
757         list_add_tail(&export->exp_obd_chain_timed,
758                       &export->exp_obd->obd_exports_timed);
759         export->exp_obd->obd_num_exports++;
760         spin_unlock(&obd->obd_dev_lock);
761
762         return export;
763 }
764 EXPORT_SYMBOL(class_new_export);
765
766 void class_unlink_export(struct obd_export *exp)
767 {
768         class_handle_unhash(&exp->exp_handle);
769
770         spin_lock(&exp->exp_obd->obd_dev_lock);
771         /* delete an uuid-export hashitem from hashtables */
772         if (!hlist_unhashed(&exp->exp_uuid_hash)) {
773                 lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body, 
774                                     &exp->exp_client_uuid, &exp->exp_uuid_hash);
775         }
776         list_del_init(&exp->exp_obd_chain);
777         list_del_init(&exp->exp_obd_chain_timed);
778         exp->exp_obd->obd_num_exports--;
779         spin_unlock(&exp->exp_obd->obd_dev_lock);
780
781         class_export_put(exp);
782 }
783 EXPORT_SYMBOL(class_unlink_export);
784
785 /* Import management functions */
786 static void import_handle_addref(void *import)
787 {
788         class_import_get(import);
789 }
790
791 struct obd_import *class_import_get(struct obd_import *import)
792 {
793         LASSERT(atomic_read(&import->imp_refcount) >= 0);
794         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
795         atomic_inc(&import->imp_refcount);
796         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
797                atomic_read(&import->imp_refcount));
798         return import;
799 }
800 EXPORT_SYMBOL(class_import_get);
801
802 void class_import_put(struct obd_import *import)
803 {
804         ENTRY;
805
806         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
807                atomic_read(&import->imp_refcount) - 1);
808
809         LASSERT(atomic_read(&import->imp_refcount) > 0);
810         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
811         LASSERT(list_empty(&import->imp_zombie_chain));
812
813         if (atomic_dec_and_test(&import->imp_refcount)) {
814
815                 CDEBUG(D_INFO, "final put import %p\n", import);
816                 
817                 spin_lock(&obd_zombie_impexp_lock);
818                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
819                 spin_unlock(&obd_zombie_impexp_lock);
820
821                 if (obd_zombie_impexp_notify != NULL)
822                         obd_zombie_impexp_notify();
823         }
824
825         EXIT;
826 }
827 EXPORT_SYMBOL(class_import_put);
828
829 void class_import_destroy(struct obd_import *import)
830 {
831         ENTRY;
832         
833         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
834                 import->imp_obd->obd_name);
835
836         LASSERT(atomic_read(&import->imp_refcount) == 0);
837
838         ptlrpc_put_connection_superhack(import->imp_connection);
839
840         while (!list_empty(&import->imp_conn_list)) {
841                 struct obd_import_conn *imp_conn;
842
843                 imp_conn = list_entry(import->imp_conn_list.next,
844                                       struct obd_import_conn, oic_item);
845                 list_del(&imp_conn->oic_item);
846                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
847                 OBD_FREE(imp_conn, sizeof(*imp_conn));
848         }
849
850         LASSERT(import->imp_sec == NULL);
851         class_decref(import->imp_obd);
852         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
853         EXIT;
854 }
855
856 static void init_imp_at(struct imp_at *at) {
857         int i;
858         at_init(&at->iat_net_latency, 0, 0);
859         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
860                 /* max service estimates are tracked on the server side, so
861                    don't use the AT history here, just use the last reported
862                    val. (But keep hist for proc histogram, worst_ever) */
863                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
864                         AT_FLG_NOHIST);
865         }
866 }
867
868 struct obd_import *class_new_import(struct obd_device *obd)
869 {
870         struct obd_import *imp;
871
872         OBD_ALLOC(imp, sizeof(*imp));
873         if (imp == NULL)
874                 return NULL;
875
876         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
877         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
878         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
879         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
880         spin_lock_init(&imp->imp_lock);
881         imp->imp_last_success_conn = 0;
882         imp->imp_state = LUSTRE_IMP_NEW;
883         imp->imp_obd = class_incref(obd);
884         sema_init(&imp->imp_sec_mutex, 1);
885         cfs_waitq_init(&imp->imp_recovery_waitq);
886
887         atomic_set(&imp->imp_refcount, 2);
888         atomic_set(&imp->imp_inflight, 0);
889         atomic_set(&imp->imp_replay_inflight, 0);
890         atomic_set(&imp->imp_inval_count, 0);
891         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
892         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
893         class_handle_hash(&imp->imp_handle, import_handle_addref);
894         init_imp_at(&imp->imp_at);
895
896         /* the default magic is V2, will be used in connect RPC, and
897          * then adjusted according to the flags in request/reply. */
898         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
899
900         return imp;
901 }
902 EXPORT_SYMBOL(class_new_import);
903
904 void class_destroy_import(struct obd_import *import)
905 {
906         LASSERT(import != NULL);
907         LASSERT(import != LP_POISON);
908
909         class_handle_unhash(&import->imp_handle);
910
911         spin_lock(&import->imp_lock);
912         import->imp_generation++;
913         spin_unlock(&import->imp_lock);
914         class_import_put(import);
915 }
916 EXPORT_SYMBOL(class_destroy_import);
917
918 /* A connection defines an export context in which preallocation can
919    be managed. This releases the export pointer reference, and returns
920    the export handle, so the export refcount is 1 when this function
921    returns. */
922 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
923                   struct obd_uuid *cluuid)
924 {
925         struct obd_export *export;
926         LASSERT(conn != NULL);
927         LASSERT(obd != NULL);
928         LASSERT(cluuid != NULL);
929         ENTRY;
930
931         export = class_new_export(obd, cluuid);
932         if (IS_ERR(export))
933                 RETURN(PTR_ERR(export));
934
935         conn->cookie = export->exp_handle.h_cookie;
936         class_export_put(export);
937
938         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
939                cluuid->uuid, conn->cookie);
940         RETURN(0);
941 }
942 EXPORT_SYMBOL(class_connect);
943
944 /* if export is involved in recovery then clean up related things */
945 void class_export_recovery_cleanup(struct obd_export *exp)
946 {
947         struct obd_device *obd = exp->exp_obd;
948
949         spin_lock_bh(&obd->obd_processing_task_lock);
950         if (obd->obd_recovering && exp->exp_in_recovery) {
951                 spin_lock(&exp->exp_lock);
952                 exp->exp_in_recovery = 0;
953                 spin_unlock(&exp->exp_lock);
954                 obd->obd_connected_clients--;
955                 /* each connected client is counted as recoverable */
956                 obd->obd_recoverable_clients--;
957                 if (exp->exp_req_replay_needed) {
958                         spin_lock(&exp->exp_lock);
959                         exp->exp_req_replay_needed = 0;
960                         spin_unlock(&exp->exp_lock);
961                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
962                         atomic_dec(&obd->obd_req_replay_clients);
963                 }
964                 if (exp->exp_lock_replay_needed) {
965                         spin_lock(&exp->exp_lock);
966                         exp->exp_lock_replay_needed = 0;
967                         spin_unlock(&exp->exp_lock);
968                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
969                         atomic_dec(&obd->obd_lock_replay_clients);                
970                 }
971         }
972         spin_unlock_bh(&obd->obd_processing_task_lock);
973 }
974
975 /* This function removes two references from the export: one for the
976  * hash entry and one for the export pointer passed in.  The export
977  * pointer passed to this function is destroyed should not be used
978  * again. */
979 int class_disconnect(struct obd_export *export)
980 {
981         int already_disconnected;
982         ENTRY;
983
984         if (export == NULL) {
985                 fixme();
986                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
987                 RETURN(-EINVAL);
988         }
989
990         spin_lock(&export->exp_lock);
991         already_disconnected = export->exp_disconnected;
992         export->exp_disconnected = 1;
993
994         if (!hlist_unhashed(&export->exp_nid_hash)) {
995                 lustre_hash_delitem(export->exp_obd->obd_nid_hash_body,
996                                     &export->exp_connection->c_peer.nid, &export->exp_nid_hash);
997         }
998         spin_unlock(&export->exp_lock);
999
1000         /* class_cleanup(), abort_recovery(), and class_fail_export()
1001          * all end up in here, and if any of them race we shouldn't
1002          * call extra class_export_puts(). */
1003         if (already_disconnected)
1004                 RETURN(0);
1005
1006         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1007                export->exp_handle.h_cookie);
1008
1009         class_export_recovery_cleanup(export);
1010         class_unlink_export(export);
1011         class_export_put(export);
1012         RETURN(0);
1013 }
1014
1015 static void class_disconnect_export_list(struct list_head *list, int flags)
1016 {
1017         int rc;
1018         struct lustre_handle fake_conn;
1019         struct obd_export *fake_exp, *exp;
1020         ENTRY;
1021
1022         /* It's possible that an export may disconnect itself, but
1023          * nothing else will be added to this list. */
1024         while (!list_empty(list)) {
1025                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1026                 class_export_get(exp);
1027
1028                 spin_lock(&exp->exp_lock);
1029                 exp->exp_flags = flags;
1030                 spin_unlock(&exp->exp_lock);
1031
1032                 if (obd_uuid_equals(&exp->exp_client_uuid,
1033                                     &exp->exp_obd->obd_uuid)) {
1034                         CDEBUG(D_HA,
1035                                "exp %p export uuid == obd uuid, don't discon\n",
1036                                exp);
1037                         /* Need to delete this now so we don't end up pointing
1038                          * to work_list later when this export is cleaned up. */
1039                         list_del_init(&exp->exp_obd_chain);
1040                         class_export_put(exp);
1041                         continue;
1042                 }
1043
1044                 fake_conn.cookie = exp->exp_handle.h_cookie;
1045                 fake_exp = class_conn2export(&fake_conn);
1046                 if (!fake_exp) {
1047                         class_export_put(exp);
1048                         continue;
1049                 }
1050
1051                 spin_lock(&fake_exp->exp_lock);
1052                 fake_exp->exp_flags = flags;
1053                 spin_unlock(&fake_exp->exp_lock);
1054
1055                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1056                        "last request at %ld\n",
1057                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1058                        exp, exp->exp_last_request_time);
1059                 rc = obd_disconnect(fake_exp);
1060                 class_export_put(exp);
1061         }
1062         EXIT;
1063 }
1064
1065 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1066 {
1067         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1068                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1069 }
1070
1071 void class_disconnect_exports(struct obd_device *obd)
1072 {
1073         struct list_head work_list;
1074         ENTRY;
1075
1076         /* Move all of the exports from obd_exports to a work list, en masse. */
1077         spin_lock(&obd->obd_dev_lock);
1078         list_add(&work_list, &obd->obd_exports);
1079         list_del_init(&obd->obd_exports);
1080         spin_unlock(&obd->obd_dev_lock);
1081         
1082         if (!list_empty(&work_list)) {
1083                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1084                        "disconnecting them\n", obd->obd_minor, obd);
1085                 class_disconnect_export_list(&work_list, 
1086                                              get_exp_flags_from_obd(obd));
1087         } else
1088                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1089                        obd->obd_minor, obd);
1090         EXIT;
1091 }
1092 EXPORT_SYMBOL(class_disconnect_exports);
1093
1094 /* Remove exports that have not completed recovery.
1095  */
1096 int class_disconnect_stale_exports(struct obd_device *obd,
1097                                    int (*test_export)(struct obd_export *))
1098 {
1099         struct list_head work_list;
1100         struct list_head *pos, *n;
1101         struct obd_export *exp;
1102         int cnt = 0;
1103         ENTRY;
1104
1105         CFS_INIT_LIST_HEAD(&work_list);
1106         spin_lock(&obd->obd_dev_lock);
1107         list_for_each_safe(pos, n, &obd->obd_exports) {
1108                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1109                 if (test_export(exp))
1110                         continue;
1111                 
1112                 list_del(&exp->exp_obd_chain);
1113                 list_add(&exp->exp_obd_chain, &work_list);
1114                 /* don't count self-export as client */
1115                 if (obd_uuid_equals(&exp->exp_client_uuid,
1116                                      &exp->exp_obd->obd_uuid))
1117                         continue;
1118
1119                 cnt++;
1120                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1121                        obd->obd_name, exp->exp_client_uuid.uuid,
1122                        exp->exp_connection == NULL ? "<unknown>" :
1123                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1124         }
1125         spin_unlock(&obd->obd_dev_lock);
1126
1127         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1128                obd->obd_name, cnt);
1129         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1130         RETURN(cnt);
1131 }
1132 EXPORT_SYMBOL(class_disconnect_stale_exports);
1133
1134 int oig_init(struct obd_io_group **oig_out)
1135 {
1136         struct obd_io_group *oig;
1137         ENTRY;
1138
1139         OBD_ALLOC(oig, sizeof(*oig));
1140         if (oig == NULL)
1141                 RETURN(-ENOMEM);
1142
1143         spin_lock_init(&oig->oig_lock);
1144         oig->oig_rc = 0;
1145         oig->oig_pending = 0;
1146         atomic_set(&oig->oig_refcount, 1);
1147         cfs_waitq_init(&oig->oig_waitq);
1148         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1149
1150         *oig_out = oig;
1151         RETURN(0);
1152 };
1153 EXPORT_SYMBOL(oig_init);
1154
1155 static inline void oig_grab(struct obd_io_group *oig)
1156 {
1157         atomic_inc(&oig->oig_refcount);
1158 }
1159
1160 void oig_release(struct obd_io_group *oig)
1161 {
1162         if (atomic_dec_and_test(&oig->oig_refcount))
1163                 OBD_FREE(oig, sizeof(*oig));
1164 }
1165 EXPORT_SYMBOL(oig_release);
1166
1167 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1168 {
1169         int rc = 0;
1170         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1171         spin_lock(&oig->oig_lock);
1172         if (oig->oig_rc) {
1173                 rc = oig->oig_rc;
1174         } else {
1175                 oig->oig_pending++;
1176                 if (occ != NULL)
1177                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1178         }
1179         spin_unlock(&oig->oig_lock);
1180         oig_grab(oig);
1181
1182         return rc;
1183 }
1184 EXPORT_SYMBOL(oig_add_one);
1185
1186 void oig_complete_one(struct obd_io_group *oig,
1187                       struct oig_callback_context *occ, int rc)
1188 {
1189         cfs_waitq_t *wake = NULL;
1190         int old_rc;
1191
1192         spin_lock(&oig->oig_lock);
1193
1194         if (occ != NULL)
1195                 list_del_init(&occ->occ_oig_item);
1196
1197         old_rc = oig->oig_rc;
1198         if (oig->oig_rc == 0 && rc != 0)
1199                 oig->oig_rc = rc;
1200
1201         if (--oig->oig_pending <= 0)
1202                 wake = &oig->oig_waitq;
1203
1204         spin_unlock(&oig->oig_lock);
1205
1206         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1207                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1208                         oig->oig_pending);
1209         if (wake)
1210                 cfs_waitq_signal(wake);
1211         oig_release(oig);
1212 }
1213 EXPORT_SYMBOL(oig_complete_one);
1214
1215 static int oig_done(struct obd_io_group *oig)
1216 {
1217         int rc = 0;
1218         spin_lock(&oig->oig_lock);
1219         if (oig->oig_pending <= 0)
1220                 rc = 1;
1221         spin_unlock(&oig->oig_lock);
1222         return rc;
1223 }
1224
1225 static void interrupted_oig(void *data)
1226 {
1227         struct obd_io_group *oig = data;
1228         struct oig_callback_context *occ;
1229
1230         spin_lock(&oig->oig_lock);
1231         /* We need to restart the processing each time we drop the lock, as
1232          * it is possible other threads called oig_complete_one() to remove
1233          * an entry elsewhere in the list while we dropped lock.  We need to
1234          * drop the lock because osc_ap_completion() calls oig_complete_one()
1235          * which re-gets this lock ;-) as well as a lock ordering issue. */
1236 restart:
1237         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1238                 if (occ->interrupted)
1239                         continue;
1240                 occ->interrupted = 1;
1241                 spin_unlock(&oig->oig_lock);
1242                 occ->occ_interrupted(occ);
1243                 spin_lock(&oig->oig_lock);
1244                 goto restart;
1245         }
1246         spin_unlock(&oig->oig_lock);
1247 }
1248
1249 int oig_wait(struct obd_io_group *oig)
1250 {
1251         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1252         int rc;
1253
1254         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1255
1256         do {
1257                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1258                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1259                 /* we can't continue until the oig has emptied and stopped
1260                  * referencing state that the caller will free upon return */
1261                 if (rc == -EINTR)
1262                         lwi = (struct l_wait_info){ 0, };
1263         } while (rc == -EINTR);
1264
1265         LASSERTF(oig->oig_pending == 0,
1266                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1267                  oig->oig_pending);
1268
1269         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1270         return oig->oig_rc;
1271 }
1272 EXPORT_SYMBOL(oig_wait);
1273
1274 void class_fail_export(struct obd_export *exp)
1275 {
1276         int rc, already_failed;
1277
1278         spin_lock(&exp->exp_lock);
1279         already_failed = exp->exp_failed;
1280         exp->exp_failed = 1;
1281         spin_unlock(&exp->exp_lock);
1282
1283         if (already_failed) {
1284                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1285                        exp, exp->exp_client_uuid.uuid);
1286                 return;
1287         }
1288
1289         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1290                exp, exp->exp_client_uuid.uuid);
1291
1292         if (obd_dump_on_timeout)
1293                 libcfs_debug_dumplog();
1294
1295         /* Most callers into obd_disconnect are removing their own reference
1296          * (request, for example) in addition to the one from the hash table.
1297          * We don't have such a reference here, so make one. */
1298         class_export_get(exp);
1299         rc = obd_disconnect(exp);
1300         if (rc)
1301                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1302         else
1303                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1304                        exp, exp->exp_client_uuid.uuid);
1305 }
1306 EXPORT_SYMBOL(class_fail_export);
1307
1308 char *obd_export_nid2str(struct obd_export *exp)
1309 {
1310         if (exp->exp_connection != NULL)
1311                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1312
1313         return "(no nid)";
1314 }
1315 EXPORT_SYMBOL(obd_export_nid2str);
1316
1317 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1318 {
1319         struct obd_export *doomed_exp = NULL;
1320         int exports_evicted = 0;
1321
1322         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1323
1324         do {
1325                 doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
1326                                                            &nid_key);
1327                 if (doomed_exp == NULL)
1328                         break;
1329
1330                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1331                          "nid %s found, wanted nid %s, requested nid %s\n",
1332                          obd_export_nid2str(doomed_exp),
1333                          libcfs_nid2str(nid_key), nid);        
1334                 LASSERTF(doomed_exp != obd->obd_self_export,
1335                          "self-export is hashed by NID?\n");
1336                 exports_evicted++;
1337                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1338                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1339                        exports_evicted);
1340                 class_fail_export(doomed_exp);
1341                 class_export_put(doomed_exp);
1342         } while (1);
1343
1344         if (!exports_evicted)
1345                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1346                        obd->obd_name, nid);
1347         return exports_evicted;
1348 }
1349 EXPORT_SYMBOL(obd_export_evict_by_nid);
1350
1351 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1352 {
1353         struct obd_export *doomed_exp = NULL;
1354         struct obd_uuid doomed;
1355         int exports_evicted = 0;
1356
1357         obd_str2uuid(&doomed, uuid);
1358         if (obd_uuid_equals(&doomed, &obd->obd_uuid)) {
1359                 CERROR("%s: can't evict myself\n", obd->obd_name);
1360                 return exports_evicted;
1361         }
1362
1363         doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body, 
1364                                                    &doomed);
1365
1366         if (doomed_exp == NULL) {
1367                 CERROR("%s: can't disconnect %s: no exports found\n",
1368                        obd->obd_name, uuid);
1369         } else {
1370                 CWARN("%s: evicting %s at adminstrative request\n",
1371                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1372                 class_fail_export(doomed_exp);
1373                 class_export_put(doomed_exp);
1374                 exports_evicted++;
1375         }
1376
1377         return exports_evicted;
1378 }
1379 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1380
1381 /**
1382  * kill zombie imports and exports
1383  */
1384 void obd_zombie_impexp_cull(void)
1385 {
1386         struct obd_import *import;
1387         struct obd_export *export;
1388         ENTRY;
1389
1390         do {
1391                 spin_lock (&obd_zombie_impexp_lock);
1392
1393                 import = NULL;
1394                 if (!list_empty(&obd_zombie_imports)) {
1395                         import = list_entry(obd_zombie_imports.next,
1396                                             struct obd_import,
1397                                             imp_zombie_chain);
1398                         list_del(&import->imp_zombie_chain);
1399                 }
1400
1401                 export = NULL;
1402                 if (!list_empty(&obd_zombie_exports)) {
1403                         export = list_entry(obd_zombie_exports.next,
1404                                             struct obd_export,
1405                                             exp_obd_chain);
1406                         list_del_init(&export->exp_obd_chain);
1407                 }
1408
1409                 spin_unlock(&obd_zombie_impexp_lock);
1410
1411                 if (import != NULL)
1412                         class_import_destroy(import);
1413
1414                 if (export != NULL)
1415                         class_export_destroy(export);
1416
1417         } while (import != NULL || export != NULL);
1418         EXIT;
1419 }
1420
1421 static struct completion        obd_zombie_start;
1422 static struct completion        obd_zombie_stop;
1423 static unsigned long            obd_zombie_flags;
1424 static cfs_waitq_t              obd_zombie_waitq;
1425
1426 enum {
1427         OBD_ZOMBIE_STOP = 1
1428 };
1429
1430 /**
1431  * check for work for kill zombie import/export thread.
1432  */
1433 int obd_zombie_impexp_check(void *arg)
1434 {
1435         int rc;
1436
1437         spin_lock(&obd_zombie_impexp_lock);
1438         rc = list_empty(&obd_zombie_imports) &&
1439              list_empty(&obd_zombie_exports) &&
1440              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1441
1442         spin_unlock(&obd_zombie_impexp_lock);
1443
1444         RETURN(rc);
1445 }
1446
1447 /**
1448  * notify import/export destroy thread about new zombie.
1449  */
1450 static void obd_zombie_impexp_notify(void)
1451 {
1452         cfs_waitq_signal(&obd_zombie_waitq);
1453 }
1454
1455 #ifdef __KERNEL__
1456
1457 /**
1458  * destroy zombie export/import thread.
1459  */
1460 static int obd_zombie_impexp_thread(void *unused)
1461 {
1462         int rc;
1463
1464         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1465                 complete(&obd_zombie_start);
1466                 RETURN(rc);
1467         }
1468
1469         complete(&obd_zombie_start);
1470
1471         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1472                 struct l_wait_info lwi = { 0 };
1473
1474                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1475
1476                 obd_zombie_impexp_cull();
1477         }
1478
1479         complete(&obd_zombie_stop);
1480
1481         RETURN(0);
1482 }
1483
1484 #else /* ! KERNEL */
1485
1486 static atomic_t zombie_recur = ATOMIC_INIT(0);
1487 static void *obd_zombie_impexp_work_cb;
1488 static void *obd_zombie_impexp_idle_cb;
1489
1490 int obd_zombie_impexp_kill(void *arg)
1491 {
1492         int rc = 0;
1493
1494         if (atomic_inc_return(&zombie_recur) == 1) {
1495                 obd_zombie_impexp_cull();
1496                 rc = 1;
1497         }
1498         atomic_dec(&zombie_recur);
1499         return rc;
1500 }
1501
1502 #endif
1503
1504 /**
1505  * start destroy zombie import/export thread
1506  */
1507 int obd_zombie_impexp_init(void)
1508 {
1509         int rc;
1510
1511         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1512         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1513         spin_lock_init(&obd_zombie_impexp_lock);
1514         init_completion(&obd_zombie_start);
1515         init_completion(&obd_zombie_stop);
1516         cfs_waitq_init(&obd_zombie_waitq);
1517
1518 #ifdef __KERNEL__
1519         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1520         if (rc < 0)
1521                 RETURN(rc);
1522
1523         wait_for_completion(&obd_zombie_start);
1524 #else
1525
1526         obd_zombie_impexp_work_cb =
1527                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1528                                                  &obd_zombie_impexp_kill, NULL);
1529
1530         obd_zombie_impexp_idle_cb =
1531                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1532                                                  &obd_zombie_impexp_check, NULL);
1533         rc = 0;
1534
1535 #endif
1536         RETURN(rc);
1537 }
1538 /**
1539  * stop destroy zombie import/export thread
1540  */
1541 void obd_zombie_impexp_stop(void)
1542 {
1543         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1544         obd_zombie_impexp_notify();
1545 #ifdef __KERNEL__
1546         wait_for_completion(&obd_zombie_stop);
1547 #else
1548         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1549         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1550 #endif
1551 }
1552