Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82 EXPORT_SYMBOL(obd_device_alloc);
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef CONFIG_KMOD
120         if (!type) {
121                 const char *modname = name;
122                 if (!request_module(modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 try_module_get(type->typ_dt_ops->o_owner);
135                 spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139
140 void class_put_type(struct obd_type *type)
141 {
142         LASSERT(type);
143         spin_lock(&type->obd_type_lock);
144         type->typ_refcnt--;
145         module_put(type->typ_dt_ops->o_owner);
146         spin_unlock(&type->obd_type_lock);
147 }
148
149 #define CLASS_MAX_NAME 1024
150
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152                         struct lprocfs_vars *vars, const char *name,
153                         struct lu_device_type *ldt)
154 {
155         struct obd_type *type;
156         int rc = 0;
157         ENTRY;
158
159         /* sanity check */
160         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
161
162         if (class_search_type(name)) {
163                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
164                 RETURN(-EEXIST);
165         }
166
167         rc = -ENOMEM;
168         OBD_ALLOC(type, sizeof(*type));
169         if (type == NULL)
170                 RETURN(rc);
171
172         OBD_ALLOC_PTR(type->typ_dt_ops);
173         OBD_ALLOC_PTR(type->typ_md_ops);
174         OBD_ALLOC(type->typ_name, strlen(name) + 1);
175
176         if (type->typ_dt_ops == NULL ||
177             type->typ_md_ops == NULL ||
178             type->typ_name == NULL)
179                 GOTO (failed, rc);
180
181         *(type->typ_dt_ops) = *dt_ops;
182         /* md_ops is optional */
183         if (md_ops)
184                 *(type->typ_md_ops) = *md_ops;
185         strcpy(type->typ_name, name);
186         spin_lock_init(&type->obd_type_lock);
187
188 #ifdef LPROCFS
189         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
190                                               vars, type);
191         if (IS_ERR(type->typ_procroot)) {
192                 rc = PTR_ERR(type->typ_procroot);
193                 type->typ_procroot = NULL;
194                 GOTO (failed, rc);
195         }
196 #endif
197         if (ldt != NULL) {
198                 type->typ_lu = ldt;
199                 rc = lu_device_type_init(ldt);
200                 if (rc != 0)
201                         GOTO (failed, rc);
202         }
203
204         spin_lock(&obd_types_lock);
205         list_add(&type->typ_chain, &obd_types);
206         spin_unlock(&obd_types_lock);
207
208         RETURN (0);
209
210  failed:
211         if (type->typ_name != NULL)
212                 OBD_FREE(type->typ_name, strlen(name) + 1);
213         if (type->typ_md_ops != NULL)
214                 OBD_FREE_PTR(type->typ_md_ops);
215         if (type->typ_dt_ops != NULL)
216                 OBD_FREE_PTR(type->typ_dt_ops);
217         OBD_FREE(type, sizeof(*type));
218         RETURN(rc);
219 }
220
221 int class_unregister_type(const char *name)
222 {
223         struct obd_type *type = class_search_type(name);
224         ENTRY;
225
226         if (!type) {
227                 CERROR("unknown obd type\n");
228                 RETURN(-EINVAL);
229         }
230
231         if (type->typ_refcnt) {
232                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233                 /* This is a bad situation, let's make the best of it */
234                 /* Remove ops, but leave the name for debugging */
235                 OBD_FREE_PTR(type->typ_dt_ops);
236                 OBD_FREE_PTR(type->typ_md_ops);
237                 RETURN(-EBUSY);
238         }
239
240         if (type->typ_procroot) {
241                 lprocfs_remove(&type->typ_procroot);
242         }
243
244         if (type->typ_lu)
245                 lu_device_type_fini(type->typ_lu);
246
247         spin_lock(&obd_types_lock);
248         list_del(&type->typ_chain);
249         spin_unlock(&obd_types_lock);
250         OBD_FREE(type->typ_name, strlen(name) + 1);
251         if (type->typ_dt_ops != NULL)
252                 OBD_FREE_PTR(type->typ_dt_ops);
253         if (type->typ_md_ops != NULL)
254                 OBD_FREE_PTR(type->typ_md_ops);
255         OBD_FREE(type, sizeof(*type));
256         RETURN(0);
257 } /* class_unregister_type */
258
259 /**
260  * Create a new obd device.
261  *
262  * Find an empty slot in ::obd_devs[], create a new obd device in it.
263  *
264  * \param typename [in] obd device type string.
265  * \param name     [in] obd device name.
266  *
267  * \retval NULL if create fails, otherwise return the obd device
268  *         pointer created.
269  */
270 struct obd_device *class_newdev(const char *type_name, const char *name)
271 {
272         struct obd_device *result = NULL;
273         struct obd_device *newdev;
274         struct obd_type *type = NULL;
275         int i;
276         int new_obd_minor = 0;
277
278         if (strlen(name) >= MAX_OBD_NAME) {
279                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280                 RETURN(ERR_PTR(-EINVAL));
281         }
282
283         type = class_get_type(type_name);
284         if (type == NULL){
285                 CERROR("OBD: unknown type: %s\n", type_name);
286                 RETURN(ERR_PTR(-ENODEV));
287         }
288
289         newdev = obd_device_alloc();
290         if (newdev == NULL) {
291                 class_put_type(type);
292                 RETURN(ERR_PTR(-ENOMEM));
293         }
294         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
295
296         spin_lock(&obd_dev_lock);
297         for (i = 0; i < class_devno_max(); i++) {
298                 struct obd_device *obd = class_num2obd(i);
299                 if (obd && obd->obd_name &&
300                     (strcmp(name, obd->obd_name) == 0)) {
301                         CERROR("Device %s already exists, won't add\n", name);
302                         if (result) {
303                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304                                          "%p obd_magic %08x != %08x\n", result,
305                                          result->obd_magic, OBD_DEVICE_MAGIC);
306                                 LASSERTF(result->obd_minor == new_obd_minor,
307                                          "%p obd_minor %d != %d\n", result,
308                                          result->obd_minor, new_obd_minor);
309
310                                 obd_devs[result->obd_minor] = NULL;
311                                 result->obd_name[0]='\0';
312                          }
313                         result = ERR_PTR(-EEXIST);
314                         break;
315                 }
316                 if (!result && !obd) {
317                         result = newdev;
318                         result->obd_minor = i;
319                         new_obd_minor = i;
320                         result->obd_type = type;
321                         strncpy(result->obd_name, name,
322                                 sizeof(result->obd_name) - 1);
323                         obd_devs[i] = result;
324                 }
325         }
326         spin_unlock(&obd_dev_lock);
327
328         if (result == NULL && i >= class_devno_max()) {
329                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
330                        class_devno_max());
331                 result = ERR_PTR(-EOVERFLOW);
332         }
333
334         if (IS_ERR(result)) {
335                 obd_device_free(newdev);
336                 class_put_type(type);
337         } else {
338                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339                        result->obd_name, result);
340         }
341         return result;
342 }
343
344 void class_release_dev(struct obd_device *obd)
345 {
346         struct obd_type *obd_type = obd->obd_type;
347
348         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352         LASSERT(obd_type != NULL);
353
354         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355                obd->obd_name,obd->obd_type->typ_name);
356
357         spin_lock(&obd_dev_lock);
358         obd_devs[obd->obd_minor] = NULL;
359         spin_unlock(&obd_dev_lock);
360         obd_device_free(obd);
361
362         class_put_type(obd_type);
363 }
364
365 int class_name2dev(const char *name)
366 {
367         int i;
368
369         if (!name)
370                 return -1;
371
372         spin_lock(&obd_dev_lock);
373         for (i = 0; i < class_devno_max(); i++) {
374                 struct obd_device *obd = class_num2obd(i);
375                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376                         /* Make sure we finished attaching before we give
377                            out any references */
378                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379                         if (obd->obd_attached) {
380                                 spin_unlock(&obd_dev_lock);
381                                 return i;
382                         }
383                         break;
384                 }
385         }
386         spin_unlock(&obd_dev_lock);
387
388         return -1;
389 }
390
391 struct obd_device *class_name2obd(const char *name)
392 {
393         int dev = class_name2dev(name);
394
395         if (dev < 0 || dev > class_devno_max())
396                 return NULL;
397         return class_num2obd(dev);
398 }
399
400 int class_uuid2dev(struct obd_uuid *uuid)
401 {
402         int i;
403
404         spin_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409                         spin_unlock(&obd_dev_lock);
410                         return i;
411                 }
412         }
413         spin_unlock(&obd_dev_lock);
414
415         return -1;
416 }
417
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
419 {
420         int dev = class_uuid2dev(uuid);
421         if (dev < 0)
422                 return NULL;
423         return class_num2obd(dev);
424 }
425
426 /**
427  * Get obd device from ::obd_devs[]
428  *
429  * \param num [in] array index
430  *
431  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432  *         otherwise return the obd device there.
433  */
434 struct obd_device *class_num2obd(int num)
435 {
436         struct obd_device *obd = NULL;
437
438         if (num < class_devno_max()) {
439                 obd = obd_devs[num];
440                 if (obd == NULL)
441                         return NULL;
442
443                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444                          "%p obd_magic %08x != %08x\n",
445                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446                 LASSERTF(obd->obd_minor == num,
447                          "%p obd_minor %0d != %0d\n",
448                          obd, obd->obd_minor, num);
449         }
450
451         return obd;
452 }
453
454 void class_obd_list(void)
455 {
456         char *status;
457         int i;
458
459         spin_lock(&obd_dev_lock);
460         for (i = 0; i < class_devno_max(); i++) {
461                 struct obd_device *obd = class_num2obd(i);
462                 if (obd == NULL)
463                         continue;
464                 if (obd->obd_stopping)
465                         status = "ST";
466                 else if (obd->obd_set_up)
467                         status = "UP";
468                 else if (obd->obd_attached)
469                         status = "AT";
470                 else
471                         status = "--";
472                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473                          i, status, obd->obd_type->typ_name,
474                          obd->obd_name, obd->obd_uuid.uuid,
475                          atomic_read(&obd->obd_refcount));
476         }
477         spin_unlock(&obd_dev_lock);
478         return;
479 }
480
481 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
482    specified, then only the client with that uuid is returned,
483    otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485                                           const char * typ_name,
486                                           struct obd_uuid *grp_uuid)
487 {
488         int i;
489
490         spin_lock(&obd_dev_lock);
491         for (i = 0; i < class_devno_max(); i++) {
492                 struct obd_device *obd = class_num2obd(i);
493                 if (obd == NULL)
494                         continue;
495                 if ((strncmp(obd->obd_type->typ_name, typ_name,
496                              strlen(typ_name)) == 0)) {
497                         if (obd_uuid_equals(tgt_uuid,
498                                             &obd->u.cli.cl_target_uuid) &&
499                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
500                                                          &obd->obd_uuid) : 1)) {
501                                 spin_unlock(&obd_dev_lock);
502                                 return obd;
503                         }
504                 }
505         }
506         spin_unlock(&obd_dev_lock);
507
508         return NULL;
509 }
510
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512    searching at *next, and if a device is found, the next index to look
513    at is saved in *next. If next is NULL, then the first matching device
514    will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
516 {
517         int i;
518
519         if (next == NULL)
520                 i = 0;
521         else if (*next >= 0 && *next < class_devno_max())
522                 i = *next;
523         else
524                 return NULL;
525
526         spin_lock(&obd_dev_lock);
527         for (; i < class_devno_max(); i++) {
528                 struct obd_device *obd = class_num2obd(i);
529                 if (obd == NULL)
530                         continue;
531                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532                         if (next != NULL)
533                                 *next = i+1;
534                         spin_unlock(&obd_dev_lock);
535                         return obd;
536                 }
537         }
538         spin_unlock(&obd_dev_lock);
539
540         return NULL;
541 }
542
543 /**
544  * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545  * adjust sptlrpc settings accordingly.
546  */
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
548 {
549         struct obd_device  *obd;
550         const char         *type;
551         int                 i, rc = 0, rc2;
552
553         LASSERT(namelen > 0);
554
555         spin_lock(&obd_dev_lock);
556         for (i = 0; i < class_devno_max(); i++) {
557                 obd = class_num2obd(i);
558
559                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
560                         continue;
561
562                 /* only notify mdc, osc, mdt, ost */
563                 type = obd->obd_type->typ_name;
564                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567                     strcmp(type, LUSTRE_OST_NAME) != 0)
568                         continue;
569
570                 if (strncmp(obd->obd_name, fsname, namelen))
571                         continue;
572
573                 class_incref(obd, __FUNCTION__, obd);
574                 spin_unlock(&obd_dev_lock);
575                 rc2 = obd_set_info_async(obd->obd_self_export,
576                                          sizeof(KEY_SPTLRPC_CONF),
577                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
578                 rc = rc ? rc : rc2;
579                 class_decref(obd, __FUNCTION__, obd);
580                 spin_lock(&obd_dev_lock);
581         }
582         spin_unlock(&obd_dev_lock);
583         return rc;
584 }
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
586
587 void obd_cleanup_caches(void)
588 {
589         int rc;
590
591         ENTRY;
592         if (obd_device_cachep) {
593                 rc = cfs_mem_cache_destroy(obd_device_cachep);
594                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595                 obd_device_cachep = NULL;
596         }
597         if (obdo_cachep) {
598                 rc = cfs_mem_cache_destroy(obdo_cachep);
599                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
600                 obdo_cachep = NULL;
601         }
602         if (import_cachep) {
603                 rc = cfs_mem_cache_destroy(import_cachep);
604                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605                 import_cachep = NULL;
606         }
607         if (capa_cachep) {
608                 rc = cfs_mem_cache_destroy(capa_cachep);
609                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
610                 capa_cachep = NULL;
611         }
612         EXIT;
613 }
614
615 int obd_init_caches(void)
616 {
617         ENTRY;
618
619         LASSERT(obd_device_cachep == NULL);
620         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621                                                  sizeof(struct obd_device),
622                                                  0, 0);
623         if (!obd_device_cachep)
624                 GOTO(out, -ENOMEM);
625
626         LASSERT(obdo_cachep == NULL);
627         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
628                                            0, 0);
629         if (!obdo_cachep)
630                 GOTO(out, -ENOMEM);
631
632         LASSERT(import_cachep == NULL);
633         import_cachep = cfs_mem_cache_create("ll_import_cache",
634                                              sizeof(struct obd_import),
635                                              0, 0);
636         if (!import_cachep)
637                 GOTO(out, -ENOMEM);
638
639         LASSERT(capa_cachep == NULL);
640         capa_cachep = cfs_mem_cache_create("capa_cache",
641                                            sizeof(struct obd_capa), 0, 0);
642         if (!capa_cachep)
643                 GOTO(out, -ENOMEM);
644
645         RETURN(0);
646  out:
647         obd_cleanup_caches();
648         RETURN(-ENOMEM);
649
650 }
651
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
654 {
655         struct obd_export *export;
656         ENTRY;
657
658         if (!conn) {
659                 CDEBUG(D_CACHE, "looking for null handle\n");
660                 RETURN(NULL);
661         }
662
663         if (conn->cookie == -1) {  /* this means assign a new connection */
664                 CDEBUG(D_CACHE, "want a new connection\n");
665                 RETURN(NULL);
666         }
667
668         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669         export = class_handle2object(conn->cookie);
670         RETURN(export);
671 }
672
673 struct obd_device *class_exp2obd(struct obd_export *exp)
674 {
675         if (exp)
676                 return exp->exp_obd;
677         return NULL;
678 }
679
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
681 {
682         struct obd_export *export;
683         export = class_conn2export(conn);
684         if (export) {
685                 struct obd_device *obd = export->exp_obd;
686                 class_export_put(export);
687                 return obd;
688         }
689         return NULL;
690 }
691
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
693 {
694         struct obd_device *obd = exp->exp_obd;
695         if (obd == NULL)
696                 return NULL;
697         return obd->u.cli.cl_import;
698 }
699
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
701 {
702         struct obd_device *obd = class_conn2obd(conn);
703         if (obd == NULL)
704                 return NULL;
705         return obd->u.cli.cl_import;
706 }
707
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
710 {
711         struct obd_device *obd = exp->exp_obd;
712         ENTRY;
713
714         LASSERT (atomic_read(&exp->exp_refcount) == 0);
715
716         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717                exp->exp_client_uuid.uuid, obd->obd_name);
718
719         LASSERT(obd != NULL);
720
721         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722         if (exp->exp_connection)
723                 ptlrpc_put_connection_superhack(exp->exp_connection);
724
725         LASSERT(list_empty(&exp->exp_outstanding_replies));
726         LASSERT(list_empty(&exp->exp_uncommitted_replies));
727         LASSERT(list_empty(&exp->exp_req_replay_queue));
728         LASSERT(list_empty(&exp->exp_queued_rpc));
729         obd_destroy_export(exp);
730         class_decref(obd, "export", exp);
731
732         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
733         EXIT;
734 }
735
736 static void export_handle_addref(void *export)
737 {
738         class_export_get(export);
739 }
740
741 struct obd_export *class_export_get(struct obd_export *exp)
742 {
743         atomic_inc(&exp->exp_refcount);
744         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745                atomic_read(&exp->exp_refcount));
746         return exp;
747 }
748 EXPORT_SYMBOL(class_export_get);
749
750 void class_export_put(struct obd_export *exp)
751 {
752         LASSERT(exp != NULL);
753         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
754                atomic_read(&exp->exp_refcount) - 1);
755         LASSERT(atomic_read(&exp->exp_refcount) > 0);
756         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
757
758         if (atomic_dec_and_test(&exp->exp_refcount)) {
759                 CDEBUG(D_IOCTL, "final put %p/%s\n",
760                        exp, exp->exp_client_uuid.uuid);
761                 obd_zombie_export_add(exp);
762         }
763 }
764 EXPORT_SYMBOL(class_export_put);
765
766 /* Creates a new export, adds it to the hash table, and returns a
767  * pointer to it. The refcount is 2: one for the hash reference, and
768  * one for the pointer returned by this function. */
769 struct obd_export *class_new_export(struct obd_device *obd,
770                                     struct obd_uuid *cluuid)
771 {
772         struct obd_export *export;
773         int rc = 0;
774
775         OBD_ALLOC_PTR(export);
776         if (!export)
777                 return ERR_PTR(-ENOMEM);
778
779         export->exp_conn_cnt = 0;
780         export->exp_lock_hash = NULL;
781         atomic_set(&export->exp_refcount, 2);
782         atomic_set(&export->exp_rpc_count, 0);
783         export->exp_obd = obd;
784         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
785         spin_lock_init(&export->exp_uncommitted_replies_lock);
786         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
787         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
788         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
789         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
790         class_handle_hash(&export->exp_handle, export_handle_addref);
791         export->exp_last_request_time = cfs_time_current_sec();
792         spin_lock_init(&export->exp_lock);
793         INIT_HLIST_NODE(&export->exp_uuid_hash);
794         INIT_HLIST_NODE(&export->exp_nid_hash);
795
796         export->exp_sp_peer = LUSTRE_SP_ANY;
797         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
798         export->exp_client_uuid = *cluuid;
799         obd_init_export(export);
800
801         spin_lock(&obd->obd_dev_lock);
802         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
803                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
804                                             &export->exp_uuid_hash);
805                 if (rc != 0) {
806                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
807                                       obd->obd_name, cluuid->uuid, rc);
808                         spin_unlock(&obd->obd_dev_lock);
809                         class_handle_unhash(&export->exp_handle);
810                         OBD_FREE_PTR(export);
811                         return ERR_PTR(-EALREADY);
812                 }
813         }
814
815         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
816         class_incref(obd, "export", export);
817         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
818         list_add_tail(&export->exp_obd_chain_timed,
819                       &export->exp_obd->obd_exports_timed);
820         export->exp_obd->obd_num_exports++;
821         spin_unlock(&obd->obd_dev_lock);
822
823         return export;
824 }
825 EXPORT_SYMBOL(class_new_export);
826
827 void class_unlink_export(struct obd_export *exp)
828 {
829         class_handle_unhash(&exp->exp_handle);
830
831         spin_lock(&exp->exp_obd->obd_dev_lock);
832         /* delete an uuid-export hashitem from hashtables */
833         if (!hlist_unhashed(&exp->exp_uuid_hash))
834                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
835                                 &exp->exp_client_uuid,
836                                 &exp->exp_uuid_hash);
837
838         list_del_init(&exp->exp_obd_chain);
839         list_del_init(&exp->exp_obd_chain_timed);
840         exp->exp_obd->obd_num_exports--;
841         spin_unlock(&exp->exp_obd->obd_dev_lock);
842
843         /* Keep these counter valid always */
844         spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
845         if (exp->exp_delayed)
846                 exp->exp_obd->obd_delayed_clients--;
847         else if (exp->exp_in_recovery)
848                 exp->exp_obd->obd_recoverable_clients--;
849         else if (exp->exp_obd->obd_recovering)
850                 exp->exp_obd->obd_max_recoverable_clients--;
851         spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
852         class_export_put(exp);
853 }
854 EXPORT_SYMBOL(class_unlink_export);
855
856 /* Import management functions */
857 void class_import_destroy(struct obd_import *imp)
858 {
859         ENTRY;
860
861         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
862                 imp->imp_obd->obd_name);
863
864         LASSERT(atomic_read(&imp->imp_refcount) == 0);
865
866         ptlrpc_put_connection_superhack(imp->imp_connection);
867
868         while (!list_empty(&imp->imp_conn_list)) {
869                 struct obd_import_conn *imp_conn;
870
871                 imp_conn = list_entry(imp->imp_conn_list.next,
872                                       struct obd_import_conn, oic_item);
873                 list_del_init(&imp_conn->oic_item);
874                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
875                 OBD_FREE(imp_conn, sizeof(*imp_conn));
876         }
877
878         LASSERT(imp->imp_sec == NULL);
879         class_decref(imp->imp_obd, "import", imp);
880         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
881         EXIT;
882 }
883
884 static void import_handle_addref(void *import)
885 {
886         class_import_get(import);
887 }
888
889 struct obd_import *class_import_get(struct obd_import *import)
890 {
891         LASSERT(atomic_read(&import->imp_refcount) >= 0);
892         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
893         atomic_inc(&import->imp_refcount);
894         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
895                atomic_read(&import->imp_refcount), 
896                import->imp_obd->obd_name);
897         return import;
898 }
899 EXPORT_SYMBOL(class_import_get);
900
901 void class_import_put(struct obd_import *imp)
902 {
903         ENTRY;
904
905         LASSERT(atomic_read(&imp->imp_refcount) > 0);
906         LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
907         LASSERT(list_empty(&imp->imp_zombie_chain));
908
909         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
910                atomic_read(&imp->imp_refcount) - 1, 
911                imp->imp_obd->obd_name);
912
913         if (atomic_dec_and_test(&imp->imp_refcount)) {
914                 CDEBUG(D_INFO, "final put import %p\n", imp);
915                 obd_zombie_import_add(imp);
916         }
917
918         EXIT;
919 }
920 EXPORT_SYMBOL(class_import_put);
921
922 static void init_imp_at(struct imp_at *at) {
923         int i;
924         at_init(&at->iat_net_latency, 0, 0);
925         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
926                 /* max service estimates are tracked on the server side, so
927                    don't use the AT history here, just use the last reported
928                    val. (But keep hist for proc histogram, worst_ever) */
929                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
930                         AT_FLG_NOHIST);
931         }
932 }
933
934 struct obd_import *class_new_import(struct obd_device *obd)
935 {
936         struct obd_import *imp;
937
938         OBD_ALLOC(imp, sizeof(*imp));
939         if (imp == NULL)
940                 return NULL;
941
942         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
943         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
944         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
945         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
946         spin_lock_init(&imp->imp_lock);
947         imp->imp_last_success_conn = 0;
948         imp->imp_state = LUSTRE_IMP_NEW;
949         imp->imp_obd = class_incref(obd, "import", imp);
950         sema_init(&imp->imp_sec_mutex, 1);
951         cfs_waitq_init(&imp->imp_recovery_waitq);
952
953         atomic_set(&imp->imp_refcount, 2);
954         atomic_set(&imp->imp_unregistering, 0);
955         atomic_set(&imp->imp_inflight, 0);
956         atomic_set(&imp->imp_replay_inflight, 0);
957         atomic_set(&imp->imp_inval_count, 0);
958         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
959         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
960         class_handle_hash(&imp->imp_handle, import_handle_addref);
961         init_imp_at(&imp->imp_at);
962
963         /* the default magic is V2, will be used in connect RPC, and
964          * then adjusted according to the flags in request/reply. */
965         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
966
967         return imp;
968 }
969 EXPORT_SYMBOL(class_new_import);
970
971 void class_destroy_import(struct obd_import *import)
972 {
973         LASSERT(import != NULL);
974         LASSERT(import != LP_POISON);
975
976         class_handle_unhash(&import->imp_handle);
977
978         spin_lock(&import->imp_lock);
979         import->imp_generation++;
980         spin_unlock(&import->imp_lock);
981         class_import_put(import);
982 }
983 EXPORT_SYMBOL(class_destroy_import);
984
985 /* A connection defines an export context in which preallocation can
986    be managed. This releases the export pointer reference, and returns
987    the export handle, so the export refcount is 1 when this function
988    returns. */
989 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
990                   struct obd_uuid *cluuid)
991 {
992         struct obd_export *export;
993         LASSERT(conn != NULL);
994         LASSERT(obd != NULL);
995         LASSERT(cluuid != NULL);
996         ENTRY;
997
998         export = class_new_export(obd, cluuid);
999         if (IS_ERR(export))
1000                 RETURN(PTR_ERR(export));
1001
1002         conn->cookie = export->exp_handle.h_cookie;
1003         class_export_put(export);
1004
1005         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1006                cluuid->uuid, conn->cookie);
1007         RETURN(0);
1008 }
1009 EXPORT_SYMBOL(class_connect);
1010
1011 /* if export is involved in recovery then clean up related things */
1012 void class_export_recovery_cleanup(struct obd_export *exp)
1013 {
1014         struct obd_device *obd = exp->exp_obd;
1015
1016         spin_lock_bh(&obd->obd_processing_task_lock);
1017         if (obd->obd_recovering && exp->exp_in_recovery) {
1018                 spin_lock(&exp->exp_lock);
1019                 exp->exp_in_recovery = 0;
1020                 spin_unlock(&exp->exp_lock);
1021                 obd->obd_connected_clients--;
1022                 /* each connected client is counted as recoverable */
1023                 obd->obd_recoverable_clients--;
1024                 if (exp->exp_req_replay_needed) {
1025                         spin_lock(&exp->exp_lock);
1026                         exp->exp_req_replay_needed = 0;
1027                         spin_unlock(&exp->exp_lock);
1028                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
1029                         atomic_dec(&obd->obd_req_replay_clients);
1030                 }
1031                 if (exp->exp_lock_replay_needed) {
1032                         spin_lock(&exp->exp_lock);
1033                         exp->exp_lock_replay_needed = 0;
1034                         spin_unlock(&exp->exp_lock);
1035                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1036                         atomic_dec(&obd->obd_lock_replay_clients);
1037                 }
1038         }
1039         spin_unlock_bh(&obd->obd_processing_task_lock);
1040 }
1041
1042 /* This function removes 1-3 references from the export:
1043  * 1 - for export pointer passed
1044  * and if disconnect really need
1045  * 2 - removing from hash
1046  * 3 - in client_unlink_export
1047  * The export pointer passed to this function can destroyed */
1048 int class_disconnect(struct obd_export *export)
1049 {
1050         int already_disconnected;
1051         ENTRY;
1052
1053         if (export == NULL) {
1054                 fixme();
1055                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1056                 RETURN(-EINVAL);
1057         }
1058
1059         spin_lock(&export->exp_lock);
1060         already_disconnected = export->exp_disconnected;
1061         export->exp_disconnected = 1;
1062         spin_unlock(&export->exp_lock);
1063
1064         /* class_cleanup(), abort_recovery(), and class_fail_export()
1065          * all end up in here, and if any of them race we shouldn't
1066          * call extra class_export_puts(). */
1067         if (already_disconnected) {
1068                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1069                 GOTO(no_disconn, already_disconnected);
1070         }
1071
1072         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1073                export->exp_handle.h_cookie);
1074
1075         if (!hlist_unhashed(&export->exp_nid_hash))
1076                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1077                                 &export->exp_connection->c_peer.nid,
1078                                 &export->exp_nid_hash);
1079
1080         class_export_recovery_cleanup(export);
1081         class_unlink_export(export);
1082 no_disconn:
1083         class_export_put(export);
1084         RETURN(0);
1085 }
1086
1087 static void class_disconnect_export_list(struct list_head *list,
1088                                          enum obd_option flags)
1089 {
1090         int rc;
1091         struct obd_export *exp;
1092         ENTRY;
1093
1094         /* It's possible that an export may disconnect itself, but
1095          * nothing else will be added to this list. */
1096         while (!list_empty(list)) {
1097                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1098                 /* need for safe call CDEBUG after obd_disconnect */
1099                 class_export_get(exp);
1100
1101                 spin_lock(&exp->exp_lock);
1102                 exp->exp_flags = flags;
1103                 spin_unlock(&exp->exp_lock);
1104
1105                 if (obd_uuid_equals(&exp->exp_client_uuid,
1106                                     &exp->exp_obd->obd_uuid)) {
1107                         CDEBUG(D_HA,
1108                                "exp %p export uuid == obd uuid, don't discon\n",
1109                                exp);
1110                         /* Need to delete this now so we don't end up pointing
1111                          * to work_list later when this export is cleaned up. */
1112                         list_del_init(&exp->exp_obd_chain);
1113                         class_export_put(exp);
1114                         continue;
1115                 }
1116
1117                 class_export_get(exp);
1118                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1119                        "last request at "CFS_TIME_T"\n",
1120                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1121                        exp, exp->exp_last_request_time);
1122                 /* release one export reference anyway */
1123                 rc = obd_disconnect(exp);
1124
1125                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1126                        obd_export_nid2str(exp), exp, rc);
1127                 class_export_put(exp);
1128         }
1129         EXIT;
1130 }
1131
1132 void class_disconnect_exports(struct obd_device *obd)
1133 {
1134         struct list_head work_list;
1135         ENTRY;
1136
1137         /* Move all of the exports from obd_exports to a work list, en masse. */
1138         CFS_INIT_LIST_HEAD(&work_list);
1139         spin_lock(&obd->obd_dev_lock);
1140         list_splice_init(&obd->obd_exports, &work_list);
1141         list_splice_init(&obd->obd_delayed_exports, &work_list);
1142         spin_unlock(&obd->obd_dev_lock);
1143
1144         if (!list_empty(&work_list)) {
1145                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1146                        "disconnecting them\n", obd->obd_minor, obd);
1147                 class_disconnect_export_list(&work_list,
1148                                              exp_flags_from_obd(obd));
1149         } else
1150                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1151                        obd->obd_minor, obd);
1152         EXIT;
1153 }
1154 EXPORT_SYMBOL(class_disconnect_exports);
1155
1156 /* Remove exports that have not completed recovery.
1157  */
1158 void class_disconnect_stale_exports(struct obd_device *obd,
1159                                     int (*test_export)(struct obd_export *),
1160                                     enum obd_option flags)
1161 {
1162         struct list_head work_list;
1163         struct list_head *pos, *n;
1164         struct obd_export *exp;
1165         ENTRY;
1166
1167         CFS_INIT_LIST_HEAD(&work_list);
1168         spin_lock(&obd->obd_dev_lock);
1169         obd->obd_stale_clients = 0;
1170         list_for_each_safe(pos, n, &obd->obd_exports) {
1171                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1172                 if (test_export(exp))
1173                         continue;
1174
1175                 list_move(&exp->exp_obd_chain, &work_list);
1176                 /* don't count self-export as client */
1177                 if (obd_uuid_equals(&exp->exp_client_uuid,
1178                                      &exp->exp_obd->obd_uuid))
1179                         continue;
1180
1181                 obd->obd_stale_clients++;
1182                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1183                        obd->obd_name, exp->exp_client_uuid.uuid,
1184                        exp->exp_connection == NULL ? "<unknown>" :
1185                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1186         }
1187         spin_unlock(&obd->obd_dev_lock);
1188
1189         CDEBUG(D_HA, "%s: disconnecting %d stale clients\n", obd->obd_name,
1190                obd->obd_stale_clients);
1191
1192         class_disconnect_export_list(&work_list, flags);
1193         EXIT;
1194 }
1195 EXPORT_SYMBOL(class_disconnect_stale_exports);
1196
1197 void class_fail_export(struct obd_export *exp)
1198 {
1199         int rc, already_failed;
1200
1201         spin_lock(&exp->exp_lock);
1202         already_failed = exp->exp_failed;
1203         exp->exp_failed = 1;
1204         spin_unlock(&exp->exp_lock);
1205
1206         if (already_failed) {
1207                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1208                        exp, exp->exp_client_uuid.uuid);
1209                 return;
1210         }
1211
1212         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1213                exp, exp->exp_client_uuid.uuid);
1214
1215         if (obd_dump_on_timeout)
1216                 libcfs_debug_dumplog();
1217
1218         /* Most callers into obd_disconnect are removing their own reference
1219          * (request, for example) in addition to the one from the hash table.
1220          * We don't have such a reference here, so make one. */
1221         class_export_get(exp);
1222         rc = obd_disconnect(exp);
1223         if (rc)
1224                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1225         else
1226                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1227                        exp, exp->exp_client_uuid.uuid);
1228 }
1229 EXPORT_SYMBOL(class_fail_export);
1230
1231 char *obd_export_nid2str(struct obd_export *exp)
1232 {
1233         if (exp->exp_connection != NULL)
1234                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1235
1236         return "(no nid)";
1237 }
1238 EXPORT_SYMBOL(obd_export_nid2str);
1239
1240 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1241 {
1242         struct obd_export *doomed_exp = NULL;
1243         int exports_evicted = 0;
1244
1245         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1246
1247         do {
1248                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1249                 if (doomed_exp == NULL)
1250                         break;
1251
1252                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1253                          "nid %s found, wanted nid %s, requested nid %s\n",
1254                          obd_export_nid2str(doomed_exp),
1255                          libcfs_nid2str(nid_key), nid);
1256                 LASSERTF(doomed_exp != obd->obd_self_export,
1257                          "self-export is hashed by NID?\n");
1258                 exports_evicted++;
1259                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1260                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1261                        exports_evicted);
1262                 class_fail_export(doomed_exp);
1263                 class_export_put(doomed_exp);
1264         } while (1);
1265
1266         if (!exports_evicted)
1267                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1268                        obd->obd_name, nid);
1269         return exports_evicted;
1270 }
1271 EXPORT_SYMBOL(obd_export_evict_by_nid);
1272
1273 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1274 {
1275         struct obd_export *doomed_exp = NULL;
1276         struct obd_uuid doomed_uuid;
1277         int exports_evicted = 0;
1278
1279         obd_str2uuid(&doomed_uuid, uuid);
1280         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1281                 CERROR("%s: can't evict myself\n", obd->obd_name);
1282                 return exports_evicted;
1283         }
1284
1285         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1286
1287         if (doomed_exp == NULL) {
1288                 CERROR("%s: can't disconnect %s: no exports found\n",
1289                        obd->obd_name, uuid);
1290         } else {
1291                 CWARN("%s: evicting %s at adminstrative request\n",
1292                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1293                 class_fail_export(doomed_exp);
1294                 class_export_put(doomed_exp);
1295                 exports_evicted++;
1296         }
1297
1298         return exports_evicted;
1299 }
1300 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1301
1302 /**
1303  * kill zombie imports and exports
1304  */
1305 void obd_zombie_impexp_cull(void)
1306 {
1307         struct obd_import *import;
1308         struct obd_export *export;
1309         ENTRY;
1310
1311         do {
1312                 spin_lock(&obd_zombie_impexp_lock);
1313
1314                 import = NULL;
1315                 if (!list_empty(&obd_zombie_imports)) {
1316                         import = list_entry(obd_zombie_imports.next,
1317                                             struct obd_import,
1318                                             imp_zombie_chain);
1319                         list_del_init(&import->imp_zombie_chain);
1320                 }
1321
1322                 export = NULL;
1323                 if (!list_empty(&obd_zombie_exports)) {
1324                         export = list_entry(obd_zombie_exports.next,
1325                                             struct obd_export,
1326                                             exp_obd_chain);
1327                         list_del_init(&export->exp_obd_chain);
1328                 }
1329
1330                 spin_unlock(&obd_zombie_impexp_lock);
1331
1332                 if (import != NULL)
1333                         class_import_destroy(import);
1334
1335                 if (export != NULL)
1336                         class_export_destroy(export);
1337
1338         } while (import != NULL || export != NULL);
1339         EXIT;
1340 }
1341
1342 static struct completion        obd_zombie_start;
1343 static struct completion        obd_zombie_stop;
1344 static unsigned long            obd_zombie_flags;
1345 static cfs_waitq_t              obd_zombie_waitq;
1346
1347 enum {
1348         OBD_ZOMBIE_STOP   = 1 << 1
1349 };
1350
1351 /**
1352  * check for work for kill zombie import/export thread.
1353  */
1354 static int obd_zombie_impexp_check(void *arg)
1355 {
1356         int rc;
1357
1358         spin_lock(&obd_zombie_impexp_lock);
1359         rc = list_empty(&obd_zombie_imports) &&
1360              list_empty(&obd_zombie_exports) &&
1361              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1362
1363         spin_unlock(&obd_zombie_impexp_lock);
1364
1365         RETURN(rc);
1366 }
1367
1368 /**
1369  * Add export to the obd_zombe thread and notify it.
1370  */
1371 static void obd_zombie_export_add(struct obd_export *exp) {
1372         spin_lock(&obd_zombie_impexp_lock);
1373         LASSERT(list_empty(&exp->exp_obd_chain));
1374         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1375         spin_unlock(&obd_zombie_impexp_lock);
1376
1377         if (obd_zombie_impexp_notify != NULL)
1378                 obd_zombie_impexp_notify();
1379 }
1380
1381 /**
1382  * Add import to the obd_zombe thread and notify it.
1383  */
1384 static void obd_zombie_import_add(struct obd_import *imp) {
1385         LASSERT(imp->imp_sec == NULL);
1386         spin_lock(&obd_zombie_impexp_lock);
1387         LASSERT(list_empty(&imp->imp_zombie_chain));
1388         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1389         spin_unlock(&obd_zombie_impexp_lock);
1390
1391         if (obd_zombie_impexp_notify != NULL)
1392                 obd_zombie_impexp_notify();
1393 }
1394
1395 /**
1396  * notify import/export destroy thread about new zombie.
1397  */
1398 static void obd_zombie_impexp_notify(void)
1399 {
1400         cfs_waitq_signal(&obd_zombie_waitq);
1401 }
1402
1403 /**
1404  * check whether obd_zombie is idle
1405  */
1406 static int obd_zombie_is_idle(void)
1407 {
1408         int rc;
1409
1410         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1411         spin_lock(&obd_zombie_impexp_lock);
1412         rc = list_empty(&obd_zombie_imports) &&
1413              list_empty(&obd_zombie_exports);
1414         spin_unlock(&obd_zombie_impexp_lock);
1415         return rc;
1416 }
1417
1418 /**
1419  * wait when obd_zombie import/export queues become empty
1420  */
1421 void obd_zombie_barrier(void)
1422 {
1423         struct l_wait_info lwi = { 0 };
1424         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1425 }
1426 EXPORT_SYMBOL(obd_zombie_barrier);
1427
1428 #ifdef __KERNEL__
1429
1430 /**
1431  * destroy zombie export/import thread.
1432  */
1433 static int obd_zombie_impexp_thread(void *unused)
1434 {
1435         int rc;
1436
1437         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1438                 complete(&obd_zombie_start);
1439                 RETURN(rc);
1440         }
1441
1442         complete(&obd_zombie_start);
1443
1444         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1445                 struct l_wait_info lwi = { 0 };
1446
1447                 l_wait_event(obd_zombie_waitq, 
1448                              !obd_zombie_impexp_check(NULL), &lwi);
1449                 obd_zombie_impexp_cull();
1450
1451                 /* 
1452                  * Notify obd_zombie_barrier callers that queues
1453                  * may be empty.
1454                  */
1455                 cfs_waitq_signal(&obd_zombie_waitq);
1456         }
1457
1458         complete(&obd_zombie_stop);
1459
1460         RETURN(0);
1461 }
1462
1463 #else /* ! KERNEL */
1464
1465 static atomic_t zombie_recur = ATOMIC_INIT(0);
1466 static void *obd_zombie_impexp_work_cb;
1467 static void *obd_zombie_impexp_idle_cb;
1468
1469 int obd_zombie_impexp_kill(void *arg)
1470 {
1471         int rc = 0;
1472
1473         if (atomic_inc_return(&zombie_recur) == 1) {
1474                 obd_zombie_impexp_cull();
1475                 rc = 1;
1476         }
1477         atomic_dec(&zombie_recur);
1478         return rc;
1479 }
1480
1481 #endif
1482
1483 /**
1484  * start destroy zombie import/export thread
1485  */
1486 int obd_zombie_impexp_init(void)
1487 {
1488         int rc;
1489
1490         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1491         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1492         spin_lock_init(&obd_zombie_impexp_lock);
1493         init_completion(&obd_zombie_start);
1494         init_completion(&obd_zombie_stop);
1495         cfs_waitq_init(&obd_zombie_waitq);
1496
1497 #ifdef __KERNEL__
1498         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1499         if (rc < 0)
1500                 RETURN(rc);
1501
1502         wait_for_completion(&obd_zombie_start);
1503 #else
1504
1505         obd_zombie_impexp_work_cb =
1506                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1507                                                  &obd_zombie_impexp_kill, NULL);
1508
1509         obd_zombie_impexp_idle_cb =
1510                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1511                                                  &obd_zombie_impexp_check, NULL);
1512         rc = 0;
1513 #endif
1514         RETURN(rc);
1515 }
1516 /**
1517  * stop destroy zombie import/export thread
1518  */
1519 void obd_zombie_impexp_stop(void)
1520 {
1521         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1522         obd_zombie_impexp_notify();
1523 #ifdef __KERNEL__
1524         wait_for_completion(&obd_zombie_stop);
1525 #else
1526         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1527         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1528 #endif
1529 }