Whamcloud - gitweb
2de3465156e60d17cbd8f67e6da71c09f4274a79
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150                         struct lprocfs_vars *vars, const char *name,
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173
174         if (type->typ_dt_ops == NULL ||
175             type->typ_md_ops == NULL ||
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = lu_device_type_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 lu_device_type_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 /**
258  * Create a new obd device.
259  *
260  * Find an empty slot in ::obd_devs[], create a new obd device in it.
261  *
262  * \param typename [in] obd device type string.
263  * \param name     [in] obd device name.
264  *
265  * \retval NULL if create fails, otherwise return the obd device
266  *         pointer created.
267  */
268 struct obd_device *class_newdev(const char *type_name, const char *name)
269 {
270         struct obd_device *result = NULL;
271         struct obd_device *newdev;
272         struct obd_type *type = NULL;
273         int i;
274         int new_obd_minor = 0;
275
276         if (strlen(name) >= MAX_OBD_NAME) {
277                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278                 RETURN(ERR_PTR(-EINVAL));
279         }
280
281         type = class_get_type(type_name);
282         if (type == NULL){
283                 CERROR("OBD: unknown type: %s\n", type_name);
284                 RETURN(ERR_PTR(-ENODEV));
285         }
286
287         newdev = obd_device_alloc();
288         if (newdev == NULL) {
289                 class_put_type(type);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
293
294         spin_lock(&obd_dev_lock);
295         for (i = 0; i < class_devno_max(); i++) {
296                 struct obd_device *obd = class_num2obd(i);
297                 if (obd && obd->obd_name &&
298                     (strcmp(name, obd->obd_name) == 0)) {
299                         CERROR("Device %s already exists, won't add\n", name);
300                         if (result) {
301                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302                                          "%p obd_magic %08x != %08x\n", result,
303                                          result->obd_magic, OBD_DEVICE_MAGIC);
304                                 LASSERTF(result->obd_minor == new_obd_minor,
305                                          "%p obd_minor %d != %d\n", result,
306                                          result->obd_minor, new_obd_minor);
307
308                                 obd_devs[result->obd_minor] = NULL;
309                                 result->obd_name[0]='\0';
310                          }
311                         result = ERR_PTR(-EEXIST);
312                         break;
313                 }
314                 if (!result && !obd) {
315                         result = newdev;
316                         result->obd_minor = i;
317                         new_obd_minor = i;
318                         result->obd_type = type;
319                         strncpy(result->obd_name, name,
320                                 sizeof(result->obd_name) - 1);
321                         obd_devs[i] = result;
322                 }
323         }
324         spin_unlock(&obd_dev_lock);
325
326         if (result == NULL && i >= class_devno_max()) {
327                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328                        class_devno_max());
329                 result = ERR_PTR(-EOVERFLOW);
330         }
331
332         if (IS_ERR(result)) {
333                 obd_device_free(newdev);
334                 class_put_type(type);
335         } else {
336                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337                        result->obd_name, result);
338         }
339         return result;
340 }
341
342 void class_release_dev(struct obd_device *obd)
343 {
344         struct obd_type *obd_type = obd->obd_type;
345
346         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350         LASSERT(obd_type != NULL);
351
352         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353                obd->obd_name,obd->obd_type->typ_name);
354
355         spin_lock(&obd_dev_lock);
356         obd_devs[obd->obd_minor] = NULL;
357         spin_unlock(&obd_dev_lock);
358         obd_device_free(obd);
359
360         class_put_type(obd_type);
361 }
362
363 int class_name2dev(const char *name)
364 {
365         int i;
366
367         if (!name)
368                 return -1;
369
370         spin_lock(&obd_dev_lock);
371         for (i = 0; i < class_devno_max(); i++) {
372                 struct obd_device *obd = class_num2obd(i);
373                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374                         /* Make sure we finished attaching before we give
375                            out any references */
376                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377                         if (obd->obd_attached) {
378                                 spin_unlock(&obd_dev_lock);
379                                 return i;
380                         }
381                         break;
382                 }
383         }
384         spin_unlock(&obd_dev_lock);
385
386         return -1;
387 }
388
389 struct obd_device *class_name2obd(const char *name)
390 {
391         int dev = class_name2dev(name);
392
393         if (dev < 0 || dev > class_devno_max())
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 int class_uuid2dev(struct obd_uuid *uuid)
399 {
400         int i;
401
402         spin_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407                         spin_unlock(&obd_dev_lock);
408                         return i;
409                 }
410         }
411         spin_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
417 {
418         int dev = class_uuid2dev(uuid);
419         if (dev < 0)
420                 return NULL;
421         return class_num2obd(dev);
422 }
423
424 /**
425  * Get obd device from ::obd_devs[]
426  *
427  * \param num [in] array index
428  *
429  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430  *         otherwise return the obd device there.
431  */
432 struct obd_device *class_num2obd(int num)
433 {
434         struct obd_device *obd = NULL;
435
436         if (num < class_devno_max()) {
437                 obd = obd_devs[num];
438                 if (obd == NULL)
439                         return NULL;
440
441                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442                          "%p obd_magic %08x != %08x\n",
443                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444                 LASSERTF(obd->obd_minor == num,
445                          "%p obd_minor %0d != %0d\n",
446                          obd, obd->obd_minor, num);
447         }
448
449         return obd;
450 }
451
452 void class_obd_list(void)
453 {
454         char *status;
455         int i;
456
457         spin_lock(&obd_dev_lock);
458         for (i = 0; i < class_devno_max(); i++) {
459                 struct obd_device *obd = class_num2obd(i);
460                 if (obd == NULL)
461                         continue;
462                 if (obd->obd_stopping)
463                         status = "ST";
464                 else if (obd->obd_set_up)
465                         status = "UP";
466                 else if (obd->obd_attached)
467                         status = "AT";
468                 else
469                         status = "--";
470                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471                          i, status, obd->obd_type->typ_name,
472                          obd->obd_name, obd->obd_uuid.uuid,
473                          atomic_read(&obd->obd_refcount));
474         }
475         spin_unlock(&obd_dev_lock);
476         return;
477 }
478
479 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
480    specified, then only the client with that uuid is returned,
481    otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483                                           const char * typ_name,
484                                           struct obd_uuid *grp_uuid)
485 {
486         int i;
487
488         spin_lock(&obd_dev_lock);
489         for (i = 0; i < class_devno_max(); i++) {
490                 struct obd_device *obd = class_num2obd(i);
491                 if (obd == NULL)
492                         continue;
493                 if ((strncmp(obd->obd_type->typ_name, typ_name,
494                              strlen(typ_name)) == 0)) {
495                         if (obd_uuid_equals(tgt_uuid,
496                                             &obd->u.cli.cl_target_uuid) &&
497                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
498                                                          &obd->obd_uuid) : 1)) {
499                                 spin_unlock(&obd_dev_lock);
500                                 return obd;
501                         }
502                 }
503         }
504         spin_unlock(&obd_dev_lock);
505
506         return NULL;
507 }
508
509 /* Iterate the obd_device list looking devices have grp_uuid. Start
510    searching at *next, and if a device is found, the next index to look
511    at is saved in *next. If next is NULL, then the first matching device
512    will always be returned. */
513 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
514 {
515         int i;
516
517         if (next == NULL)
518                 i = 0;
519         else if (*next >= 0 && *next < class_devno_max())
520                 i = *next;
521         else
522                 return NULL;
523
524         spin_lock(&obd_dev_lock);
525         for (; i < class_devno_max(); i++) {
526                 struct obd_device *obd = class_num2obd(i);
527                 if (obd == NULL)
528                         continue;
529                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
530                         if (next != NULL)
531                                 *next = i+1;
532                         spin_unlock(&obd_dev_lock);
533                         return obd;
534                 }
535         }
536         spin_unlock(&obd_dev_lock);
537
538         return NULL;
539 }
540
541 /**
542  * to notify sptlrpc log for @fsname has changed, let every relevant OBD
543  * adjust sptlrpc settings accordingly.
544  */
545 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
546 {
547         struct obd_device  *obd;
548         const char         *type;
549         int                 i, rc = 0, rc2;
550
551         LASSERT(namelen > 0);
552
553         spin_lock(&obd_dev_lock);
554         for (i = 0; i < class_devno_max(); i++) {
555                 obd = class_num2obd(i);
556
557                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
558                         continue;
559
560                 /* only notify mdc, osc, mdt, ost */
561                 type = obd->obd_type->typ_name;
562                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
563                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
564                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
565                     strcmp(type, LUSTRE_OST_NAME) != 0)
566                         continue;
567
568                 if (strncmp(obd->obd_name, fsname, namelen))
569                         continue;
570
571                 class_incref(obd, __FUNCTION__, obd);
572                 spin_unlock(&obd_dev_lock);
573                 rc2 = obd_set_info_async(obd->obd_self_export,
574                                          sizeof(KEY_SPTLRPC_CONF),
575                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
576                 rc = rc ? rc : rc2;
577                 class_decref(obd, __FUNCTION__, obd);
578                 spin_lock(&obd_dev_lock);
579         }
580         spin_unlock(&obd_dev_lock);
581         return rc;
582 }
583 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
584
585 void obd_cleanup_caches(void)
586 {
587         int rc;
588
589         ENTRY;
590         if (obd_device_cachep) {
591                 rc = cfs_mem_cache_destroy(obd_device_cachep);
592                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
593                 obd_device_cachep = NULL;
594         }
595         if (obdo_cachep) {
596                 rc = cfs_mem_cache_destroy(obdo_cachep);
597                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
598                 obdo_cachep = NULL;
599         }
600         if (import_cachep) {
601                 rc = cfs_mem_cache_destroy(import_cachep);
602                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
603                 import_cachep = NULL;
604         }
605         if (capa_cachep) {
606                 rc = cfs_mem_cache_destroy(capa_cachep);
607                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
608                 capa_cachep = NULL;
609         }
610         EXIT;
611 }
612
613 int obd_init_caches(void)
614 {
615         ENTRY;
616
617         LASSERT(obd_device_cachep == NULL);
618         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
619                                                  sizeof(struct obd_device),
620                                                  0, 0);
621         if (!obd_device_cachep)
622                 GOTO(out, -ENOMEM);
623
624         LASSERT(obdo_cachep == NULL);
625         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
626                                            0, 0);
627         if (!obdo_cachep)
628                 GOTO(out, -ENOMEM);
629
630         LASSERT(import_cachep == NULL);
631         import_cachep = cfs_mem_cache_create("ll_import_cache",
632                                              sizeof(struct obd_import),
633                                              0, 0);
634         if (!import_cachep)
635                 GOTO(out, -ENOMEM);
636
637         LASSERT(capa_cachep == NULL);
638         capa_cachep = cfs_mem_cache_create("capa_cache",
639                                            sizeof(struct obd_capa), 0, 0);
640         if (!capa_cachep)
641                 GOTO(out, -ENOMEM);
642
643         RETURN(0);
644  out:
645         obd_cleanup_caches();
646         RETURN(-ENOMEM);
647
648 }
649
650 /* map connection to client */
651 struct obd_export *class_conn2export(struct lustre_handle *conn)
652 {
653         struct obd_export *export;
654         ENTRY;
655
656         if (!conn) {
657                 CDEBUG(D_CACHE, "looking for null handle\n");
658                 RETURN(NULL);
659         }
660
661         if (conn->cookie == -1) {  /* this means assign a new connection */
662                 CDEBUG(D_CACHE, "want a new connection\n");
663                 RETURN(NULL);
664         }
665
666         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
667         export = class_handle2object(conn->cookie);
668         RETURN(export);
669 }
670
671 struct obd_device *class_exp2obd(struct obd_export *exp)
672 {
673         if (exp)
674                 return exp->exp_obd;
675         return NULL;
676 }
677
678 struct obd_device *class_conn2obd(struct lustre_handle *conn)
679 {
680         struct obd_export *export;
681         export = class_conn2export(conn);
682         if (export) {
683                 struct obd_device *obd = export->exp_obd;
684                 class_export_put(export);
685                 return obd;
686         }
687         return NULL;
688 }
689
690 struct obd_import *class_exp2cliimp(struct obd_export *exp)
691 {
692         struct obd_device *obd = exp->exp_obd;
693         if (obd == NULL)
694                 return NULL;
695         return obd->u.cli.cl_import;
696 }
697
698 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
699 {
700         struct obd_device *obd = class_conn2obd(conn);
701         if (obd == NULL)
702                 return NULL;
703         return obd->u.cli.cl_import;
704 }
705
706 /* Export management functions */
707 static void export_handle_addref(void *export)
708 {
709         class_export_get(export);
710 }
711
712 struct obd_export *class_export_get(struct obd_export *exp)
713 {
714         atomic_inc(&exp->exp_refcount);
715         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
716                atomic_read(&exp->exp_refcount));
717         return exp;
718 }
719 EXPORT_SYMBOL(class_export_get);
720
721 void class_export_put(struct obd_export *exp)
722 {
723         LASSERT(exp != NULL);
724         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
725                atomic_read(&exp->exp_refcount) - 1);
726         LASSERT(atomic_read(&exp->exp_refcount) > 0);
727         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
728
729         if (atomic_dec_and_test(&exp->exp_refcount)) {
730                 LASSERT (list_empty(&exp->exp_obd_chain));
731
732                 CDEBUG(D_IOCTL, "final put %p/%s\n",
733                        exp, exp->exp_client_uuid.uuid);
734
735                 spin_lock(&obd_zombie_impexp_lock);
736                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
737                 spin_unlock(&obd_zombie_impexp_lock);
738
739                 if (obd_zombie_impexp_notify != NULL)
740                         obd_zombie_impexp_notify();
741         }
742 }
743 EXPORT_SYMBOL(class_export_put);
744
745 static void class_export_destroy(struct obd_export *exp)
746 {
747         struct obd_device *obd = exp->exp_obd;
748         ENTRY;
749
750         LASSERT (atomic_read(&exp->exp_refcount) == 0);
751
752         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
753                exp->exp_client_uuid.uuid, obd->obd_name);
754
755         LASSERT(obd != NULL);
756
757         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
758         if (exp->exp_connection)
759                 ptlrpc_put_connection_superhack(exp->exp_connection);
760
761         LASSERT(list_empty(&exp->exp_outstanding_replies));
762         LASSERT(list_empty(&exp->exp_req_replay_queue));
763         LASSERT(list_empty(&exp->exp_queued_rpc));
764         obd_destroy_export(exp);
765         class_decref(obd, "export", exp);
766
767         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
768         EXIT;
769 }
770
771 /* Creates a new export, adds it to the hash table, and returns a
772  * pointer to it. The refcount is 2: one for the hash reference, and
773  * one for the pointer returned by this function. */
774 struct obd_export *class_new_export(struct obd_device *obd,
775                                     struct obd_uuid *cluuid)
776 {
777         struct obd_export *export;
778         int rc = 0;
779
780         OBD_ALLOC_PTR(export);
781         if (!export)
782                 return ERR_PTR(-ENOMEM);
783
784         export->exp_conn_cnt = 0;
785         export->exp_lock_hash = NULL;
786         atomic_set(&export->exp_refcount, 2);
787         atomic_set(&export->exp_rpc_count, 0);
788         export->exp_obd = obd;
789         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
790         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
791         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
792         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
793         class_handle_hash(&export->exp_handle, export_handle_addref);
794         export->exp_last_request_time = cfs_time_current_sec();
795         spin_lock_init(&export->exp_lock);
796         INIT_HLIST_NODE(&export->exp_uuid_hash);
797         INIT_HLIST_NODE(&export->exp_nid_hash);
798
799         export->exp_sp_peer = LUSTRE_SP_ANY;
800         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
801         export->exp_client_uuid = *cluuid;
802         obd_init_export(export);
803
804         spin_lock(&obd->obd_dev_lock);
805         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
806                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
807                                             &export->exp_uuid_hash);
808                 if (rc != 0) {
809                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
810                                       obd->obd_name, cluuid->uuid, rc);
811                         spin_unlock(&obd->obd_dev_lock);
812                         class_handle_unhash(&export->exp_handle);
813                         OBD_FREE_PTR(export);
814                         return ERR_PTR(-EALREADY);
815                 }
816         }
817
818         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
819         class_incref(obd, "export", export);
820         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
821         list_add_tail(&export->exp_obd_chain_timed,
822                       &export->exp_obd->obd_exports_timed);
823         export->exp_obd->obd_num_exports++;
824         spin_unlock(&obd->obd_dev_lock);
825
826         return export;
827 }
828 EXPORT_SYMBOL(class_new_export);
829
830 void class_unlink_export(struct obd_export *exp)
831 {
832         class_handle_unhash(&exp->exp_handle);
833
834         spin_lock(&exp->exp_obd->obd_dev_lock);
835         /* delete an uuid-export hashitem from hashtables */
836         if (!hlist_unhashed(&exp->exp_uuid_hash))
837                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
838                                 &exp->exp_client_uuid,
839                                 &exp->exp_uuid_hash);
840
841         list_del_init(&exp->exp_obd_chain);
842         list_del_init(&exp->exp_obd_chain_timed);
843         exp->exp_obd->obd_num_exports--;
844         spin_unlock(&exp->exp_obd->obd_dev_lock);
845
846         class_export_put(exp);
847 }
848 EXPORT_SYMBOL(class_unlink_export);
849
850 /* Import management functions */
851 static void import_handle_addref(void *import)
852 {
853         class_import_get(import);
854 }
855
856 struct obd_import *class_import_get(struct obd_import *import)
857 {
858         LASSERT(atomic_read(&import->imp_refcount) >= 0);
859         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
860         atomic_inc(&import->imp_refcount);
861         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
862                atomic_read(&import->imp_refcount), 
863                import->imp_obd->obd_name);
864         return import;
865 }
866 EXPORT_SYMBOL(class_import_get);
867
868 void class_import_put(struct obd_import *import)
869 {
870         ENTRY;
871
872         LASSERT(atomic_read(&import->imp_refcount) > 0);
873         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
874         LASSERT(list_empty(&import->imp_zombie_chain));
875
876         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
877                atomic_read(&import->imp_refcount) - 1, 
878                import->imp_obd->obd_name);
879
880         if (atomic_dec_and_test(&import->imp_refcount)) {
881                 CDEBUG(D_INFO, "final put import %p\n", import);
882                 spin_lock(&obd_zombie_impexp_lock);
883                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
884                 spin_unlock(&obd_zombie_impexp_lock);
885
886                 if (obd_zombie_impexp_notify != NULL)
887                         obd_zombie_impexp_notify();
888         }
889
890         EXIT;
891 }
892 EXPORT_SYMBOL(class_import_put);
893
894 void class_import_destroy(struct obd_import *import)
895 {
896         ENTRY;
897
898         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
899                 import->imp_obd->obd_name);
900
901         LASSERT(atomic_read(&import->imp_refcount) == 0);
902
903         ptlrpc_put_connection_superhack(import->imp_connection);
904
905         while (!list_empty(&import->imp_conn_list)) {
906                 struct obd_import_conn *imp_conn;
907
908                 imp_conn = list_entry(import->imp_conn_list.next,
909                                       struct obd_import_conn, oic_item);
910                 list_del(&imp_conn->oic_item);
911                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
912                 OBD_FREE(imp_conn, sizeof(*imp_conn));
913         }
914
915         LASSERT(import->imp_sec == NULL);
916         class_decref(import->imp_obd, "import", import);
917         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
918         EXIT;
919 }
920
921 static void init_imp_at(struct imp_at *at) {
922         int i;
923         at_init(&at->iat_net_latency, 0, 0);
924         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
925                 /* max service estimates are tracked on the server side, so
926                    don't use the AT history here, just use the last reported
927                    val. (But keep hist for proc histogram, worst_ever) */
928                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
929                         AT_FLG_NOHIST);
930         }
931 }
932
933 struct obd_import *class_new_import(struct obd_device *obd)
934 {
935         struct obd_import *imp;
936
937         OBD_ALLOC(imp, sizeof(*imp));
938         if (imp == NULL)
939                 return NULL;
940
941         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
942         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
943         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
944         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
945         spin_lock_init(&imp->imp_lock);
946         imp->imp_last_success_conn = 0;
947         imp->imp_state = LUSTRE_IMP_NEW;
948         imp->imp_obd = class_incref(obd, "import", imp);
949         sema_init(&imp->imp_sec_mutex, 1);
950         cfs_waitq_init(&imp->imp_recovery_waitq);
951
952         atomic_set(&imp->imp_refcount, 2);
953         atomic_set(&imp->imp_unregistering, 0);
954         atomic_set(&imp->imp_inflight, 0);
955         atomic_set(&imp->imp_replay_inflight, 0);
956         atomic_set(&imp->imp_inval_count, 0);
957         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
958         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
959         class_handle_hash(&imp->imp_handle, import_handle_addref);
960         init_imp_at(&imp->imp_at);
961
962         /* the default magic is V2, will be used in connect RPC, and
963          * then adjusted according to the flags in request/reply. */
964         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
965
966         return imp;
967 }
968 EXPORT_SYMBOL(class_new_import);
969
970 void class_destroy_import(struct obd_import *import)
971 {
972         LASSERT(import != NULL);
973         LASSERT(import != LP_POISON);
974
975         class_handle_unhash(&import->imp_handle);
976
977         spin_lock(&import->imp_lock);
978         import->imp_generation++;
979         spin_unlock(&import->imp_lock);
980         class_import_put(import);
981 }
982 EXPORT_SYMBOL(class_destroy_import);
983
984 /* A connection defines an export context in which preallocation can
985    be managed. This releases the export pointer reference, and returns
986    the export handle, so the export refcount is 1 when this function
987    returns. */
988 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
989                   struct obd_uuid *cluuid)
990 {
991         struct obd_export *export;
992         LASSERT(conn != NULL);
993         LASSERT(obd != NULL);
994         LASSERT(cluuid != NULL);
995         ENTRY;
996
997         export = class_new_export(obd, cluuid);
998         if (IS_ERR(export))
999                 RETURN(PTR_ERR(export));
1000
1001         conn->cookie = export->exp_handle.h_cookie;
1002         class_export_put(export);
1003
1004         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1005                cluuid->uuid, conn->cookie);
1006         RETURN(0);
1007 }
1008 EXPORT_SYMBOL(class_connect);
1009
1010 /* if export is involved in recovery then clean up related things */
1011 void class_export_recovery_cleanup(struct obd_export *exp)
1012 {
1013         struct obd_device *obd = exp->exp_obd;
1014
1015         spin_lock_bh(&obd->obd_processing_task_lock);
1016         if (obd->obd_recovering && exp->exp_in_recovery) {
1017                 spin_lock(&exp->exp_lock);
1018                 exp->exp_in_recovery = 0;
1019                 spin_unlock(&exp->exp_lock);
1020                 obd->obd_connected_clients--;
1021                 /* each connected client is counted as recoverable */
1022                 obd->obd_recoverable_clients--;
1023                 if (exp->exp_req_replay_needed) {
1024                         spin_lock(&exp->exp_lock);
1025                         exp->exp_req_replay_needed = 0;
1026                         spin_unlock(&exp->exp_lock);
1027                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
1028                         atomic_dec(&obd->obd_req_replay_clients);
1029                 }
1030                 if (exp->exp_lock_replay_needed) {
1031                         spin_lock(&exp->exp_lock);
1032                         exp->exp_lock_replay_needed = 0;
1033                         spin_unlock(&exp->exp_lock);
1034                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1035                         atomic_dec(&obd->obd_lock_replay_clients);
1036                 }
1037         }
1038         spin_unlock_bh(&obd->obd_processing_task_lock);
1039 }
1040
1041 /* This function removes two references from the export: one for the
1042  * hash entry and one for the export pointer passed in.  The export
1043  * pointer passed to this function is destroyed should not be used
1044  * again. */
1045 int class_disconnect(struct obd_export *export)
1046 {
1047         int already_disconnected;
1048         ENTRY;
1049
1050         if (export == NULL) {
1051                 fixme();
1052                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1053                 RETURN(-EINVAL);
1054         }
1055
1056         spin_lock(&export->exp_lock);
1057         already_disconnected = export->exp_disconnected;
1058         export->exp_disconnected = 1;
1059
1060         if (!hlist_unhashed(&export->exp_nid_hash))
1061                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1062                                 &export->exp_connection->c_peer.nid,
1063                                 &export->exp_nid_hash);
1064
1065         spin_unlock(&export->exp_lock);
1066
1067         /* class_cleanup(), abort_recovery(), and class_fail_export()
1068          * all end up in here, and if any of them race we shouldn't
1069          * call extra class_export_puts(). */
1070         if (already_disconnected)
1071                 RETURN(0);
1072
1073         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1074                export->exp_handle.h_cookie);
1075
1076         class_export_recovery_cleanup(export);
1077         class_unlink_export(export);
1078         class_export_put(export);
1079         RETURN(0);
1080 }
1081
1082 static void class_disconnect_export_list(struct list_head *list, int flags)
1083 {
1084         int rc;
1085         struct lustre_handle fake_conn;
1086         struct obd_export *fake_exp, *exp;
1087         ENTRY;
1088
1089         /* It's possible that an export may disconnect itself, but
1090          * nothing else will be added to this list. */
1091         while (!list_empty(list)) {
1092                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1093                 class_export_get(exp);
1094
1095                 spin_lock(&exp->exp_lock);
1096                 exp->exp_flags = flags;
1097                 spin_unlock(&exp->exp_lock);
1098
1099                 if (obd_uuid_equals(&exp->exp_client_uuid,
1100                                     &exp->exp_obd->obd_uuid)) {
1101                         CDEBUG(D_HA,
1102                                "exp %p export uuid == obd uuid, don't discon\n",
1103                                exp);
1104                         /* Need to delete this now so we don't end up pointing
1105                          * to work_list later when this export is cleaned up. */
1106                         list_del_init(&exp->exp_obd_chain);
1107                         class_export_put(exp);
1108                         continue;
1109                 }
1110
1111                 fake_conn.cookie = exp->exp_handle.h_cookie;
1112                 fake_exp = class_conn2export(&fake_conn);
1113                 if (!fake_exp) {
1114                         class_export_put(exp);
1115                         continue;
1116                 }
1117
1118                 spin_lock(&fake_exp->exp_lock);
1119                 fake_exp->exp_flags = flags;
1120                 spin_unlock(&fake_exp->exp_lock);
1121
1122                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1123                        "last request at "CFS_TIME_T"\n",
1124                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1125                        exp, exp->exp_last_request_time);
1126                 rc = obd_disconnect(fake_exp);
1127                 class_export_put(exp);
1128         }
1129         EXIT;
1130 }
1131
1132 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1133 {
1134         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1135                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1136 }
1137
1138 void class_disconnect_exports(struct obd_device *obd)
1139 {
1140         struct list_head work_list;
1141         ENTRY;
1142
1143         /* Move all of the exports from obd_exports to a work list, en masse. */
1144         spin_lock(&obd->obd_dev_lock);
1145         list_add(&work_list, &obd->obd_exports);
1146         list_del_init(&obd->obd_exports);
1147         spin_unlock(&obd->obd_dev_lock);
1148
1149         if (!list_empty(&work_list)) {
1150                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1151                        "disconnecting them\n", obd->obd_minor, obd);
1152                 class_disconnect_export_list(&work_list,
1153                                              get_exp_flags_from_obd(obd));
1154         } else
1155                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1156                        obd->obd_minor, obd);
1157         EXIT;
1158 }
1159 EXPORT_SYMBOL(class_disconnect_exports);
1160
1161 /* Remove exports that have not completed recovery.
1162  */
1163 int class_disconnect_stale_exports(struct obd_device *obd,
1164                                    int (*test_export)(struct obd_export *))
1165 {
1166         struct list_head work_list;
1167         struct list_head *pos, *n;
1168         struct obd_export *exp;
1169         int cnt = 0;
1170         ENTRY;
1171
1172         CFS_INIT_LIST_HEAD(&work_list);
1173         spin_lock(&obd->obd_dev_lock);
1174         list_for_each_safe(pos, n, &obd->obd_exports) {
1175                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1176                 if (test_export(exp))
1177                         continue;
1178
1179                 list_del(&exp->exp_obd_chain);
1180                 list_add(&exp->exp_obd_chain, &work_list);
1181                 /* don't count self-export as client */
1182                 if (obd_uuid_equals(&exp->exp_client_uuid,
1183                                      &exp->exp_obd->obd_uuid))
1184                         continue;
1185
1186                 cnt++;
1187                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1188                        obd->obd_name, exp->exp_client_uuid.uuid,
1189                        exp->exp_connection == NULL ? "<unknown>" :
1190                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1191         }
1192         spin_unlock(&obd->obd_dev_lock);
1193
1194         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1195                obd->obd_name, cnt);
1196         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1197         RETURN(cnt);
1198 }
1199 EXPORT_SYMBOL(class_disconnect_stale_exports);
1200
1201 void class_fail_export(struct obd_export *exp)
1202 {
1203         int rc, already_failed;
1204
1205         spin_lock(&exp->exp_lock);
1206         already_failed = exp->exp_failed;
1207         exp->exp_failed = 1;
1208         spin_unlock(&exp->exp_lock);
1209
1210         if (already_failed) {
1211                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1212                        exp, exp->exp_client_uuid.uuid);
1213                 return;
1214         }
1215
1216         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1217                exp, exp->exp_client_uuid.uuid);
1218
1219         if (obd_dump_on_timeout)
1220                 libcfs_debug_dumplog();
1221
1222         /* Most callers into obd_disconnect are removing their own reference
1223          * (request, for example) in addition to the one from the hash table.
1224          * We don't have such a reference here, so make one. */
1225         class_export_get(exp);
1226         rc = obd_disconnect(exp);
1227         if (rc)
1228                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1229         else
1230                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1231                        exp, exp->exp_client_uuid.uuid);
1232 }
1233 EXPORT_SYMBOL(class_fail_export);
1234
1235 char *obd_export_nid2str(struct obd_export *exp)
1236 {
1237         if (exp->exp_connection != NULL)
1238                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1239
1240         return "(no nid)";
1241 }
1242 EXPORT_SYMBOL(obd_export_nid2str);
1243
1244 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1245 {
1246         struct obd_export *doomed_exp = NULL;
1247         int exports_evicted = 0;
1248
1249         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1250
1251         do {
1252                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1253                 if (doomed_exp == NULL)
1254                         break;
1255
1256                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1257                          "nid %s found, wanted nid %s, requested nid %s\n",
1258                          obd_export_nid2str(doomed_exp),
1259                          libcfs_nid2str(nid_key), nid);
1260                 LASSERTF(doomed_exp != obd->obd_self_export,
1261                          "self-export is hashed by NID?\n");
1262                 exports_evicted++;
1263                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1264                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1265                        exports_evicted);
1266                 class_fail_export(doomed_exp);
1267                 class_export_put(doomed_exp);
1268         } while (1);
1269
1270         if (!exports_evicted)
1271                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1272                        obd->obd_name, nid);
1273         return exports_evicted;
1274 }
1275 EXPORT_SYMBOL(obd_export_evict_by_nid);
1276
1277 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1278 {
1279         struct obd_export *doomed_exp = NULL;
1280         struct obd_uuid doomed_uuid;
1281         int exports_evicted = 0;
1282
1283         obd_str2uuid(&doomed_uuid, uuid);
1284         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1285                 CERROR("%s: can't evict myself\n", obd->obd_name);
1286                 return exports_evicted;
1287         }
1288
1289         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1290
1291         if (doomed_exp == NULL) {
1292                 CERROR("%s: can't disconnect %s: no exports found\n",
1293                        obd->obd_name, uuid);
1294         } else {
1295                 CWARN("%s: evicting %s at adminstrative request\n",
1296                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1297                 class_fail_export(doomed_exp);
1298                 class_export_put(doomed_exp);
1299                 exports_evicted++;
1300         }
1301
1302         return exports_evicted;
1303 }
1304 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1305
1306 /**
1307  * kill zombie imports and exports
1308  */
1309 void obd_zombie_impexp_cull(void)
1310 {
1311         struct obd_import *import;
1312         struct obd_export *export;
1313         ENTRY;
1314
1315         do {
1316                 spin_lock (&obd_zombie_impexp_lock);
1317
1318                 import = NULL;
1319                 if (!list_empty(&obd_zombie_imports)) {
1320                         import = list_entry(obd_zombie_imports.next,
1321                                             struct obd_import,
1322                                             imp_zombie_chain);
1323                         list_del(&import->imp_zombie_chain);
1324                 }
1325
1326                 export = NULL;
1327                 if (!list_empty(&obd_zombie_exports)) {
1328                         export = list_entry(obd_zombie_exports.next,
1329                                             struct obd_export,
1330                                             exp_obd_chain);
1331                         list_del_init(&export->exp_obd_chain);
1332                 }
1333
1334                 spin_unlock(&obd_zombie_impexp_lock);
1335
1336                 if (import != NULL)
1337                         class_import_destroy(import);
1338
1339                 if (export != NULL)
1340                         class_export_destroy(export);
1341
1342         } while (import != NULL || export != NULL);
1343         EXIT;
1344 }
1345
1346 static struct completion        obd_zombie_start;
1347 static struct completion        obd_zombie_stop;
1348 static unsigned long            obd_zombie_flags;
1349 static cfs_waitq_t              obd_zombie_waitq;
1350
1351 enum {
1352         OBD_ZOMBIE_STOP = 1
1353 };
1354
1355 /**
1356  * check for work for kill zombie import/export thread.
1357  */
1358 static int obd_zombie_impexp_check(void *arg)
1359 {
1360         int rc;
1361
1362         spin_lock(&obd_zombie_impexp_lock);
1363         rc = list_empty(&obd_zombie_imports) &&
1364              list_empty(&obd_zombie_exports) &&
1365              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1366
1367         spin_unlock(&obd_zombie_impexp_lock);
1368
1369         RETURN(rc);
1370 }
1371
1372 /**
1373  * notify import/export destroy thread about new zombie.
1374  */
1375 static void obd_zombie_impexp_notify(void)
1376 {
1377         cfs_waitq_signal(&obd_zombie_waitq);
1378 }
1379
1380 /**
1381  * check whether obd_zombie is idle
1382  */
1383 static int obd_zombie_is_idle(void)
1384 {
1385         int rc;
1386
1387         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1388         spin_lock(&obd_zombie_impexp_lock);
1389         rc = list_empty(&obd_zombie_imports) &&
1390              list_empty(&obd_zombie_exports);
1391         spin_unlock(&obd_zombie_impexp_lock);
1392         return rc;
1393 }
1394
1395 /**
1396  * wait when obd_zombie import/export queues become empty
1397  */
1398 void obd_zombie_barrier(void)
1399 {
1400         struct l_wait_info lwi = { 0 };
1401
1402         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1403 }
1404 EXPORT_SYMBOL(obd_zombie_barrier);
1405
1406 #ifdef __KERNEL__
1407
1408 /**
1409  * destroy zombie export/import thread.
1410  */
1411 static int obd_zombie_impexp_thread(void *unused)
1412 {
1413         int rc;
1414
1415         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1416                 complete(&obd_zombie_start);
1417                 RETURN(rc);
1418         }
1419
1420         complete(&obd_zombie_start);
1421
1422         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1423                 struct l_wait_info lwi = { 0 };
1424
1425                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1426
1427                 obd_zombie_impexp_cull();
1428                 /* Notify obd_zombie_barrier callers that queues may be empty */
1429                 cfs_waitq_signal(&obd_zombie_waitq);
1430         }
1431
1432         complete(&obd_zombie_stop);
1433
1434         RETURN(0);
1435 }
1436
1437 #else /* ! KERNEL */
1438
1439 static atomic_t zombie_recur = ATOMIC_INIT(0);
1440 static void *obd_zombie_impexp_work_cb;
1441 static void *obd_zombie_impexp_idle_cb;
1442
1443 int obd_zombie_impexp_kill(void *arg)
1444 {
1445         int rc = 0;
1446
1447         if (atomic_inc_return(&zombie_recur) == 1) {
1448                 obd_zombie_impexp_cull();
1449                 rc = 1;
1450         }
1451         atomic_dec(&zombie_recur);
1452         return rc;
1453 }
1454
1455 #endif
1456
1457 /**
1458  * start destroy zombie import/export thread
1459  */
1460 int obd_zombie_impexp_init(void)
1461 {
1462         int rc;
1463
1464         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1465         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1466         spin_lock_init(&obd_zombie_impexp_lock);
1467         init_completion(&obd_zombie_start);
1468         init_completion(&obd_zombie_stop);
1469         cfs_waitq_init(&obd_zombie_waitq);
1470
1471 #ifdef __KERNEL__
1472         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1473         if (rc < 0)
1474                 RETURN(rc);
1475
1476         wait_for_completion(&obd_zombie_start);
1477 #else
1478
1479         obd_zombie_impexp_work_cb =
1480                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1481                                                  &obd_zombie_impexp_kill, NULL);
1482
1483         obd_zombie_impexp_idle_cb =
1484                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1485                                                  &obd_zombie_impexp_check, NULL);
1486         rc = 0;
1487
1488 #endif
1489         RETURN(rc);
1490 }
1491 /**
1492  * stop destroy zombie import/export thread
1493  */
1494 void obd_zombie_impexp_stop(void)
1495 {
1496         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1497         obd_zombie_impexp_notify();
1498 #ifdef __KERNEL__
1499         wait_for_completion(&obd_zombie_stop);
1500 #else
1501         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1502         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1503 #endif
1504 }