Whamcloud - gitweb
- land b_hd_ver_recov
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82 EXPORT_SYMBOL(obd_device_alloc);
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef CONFIG_KMOD
120         if (!type) {
121                 const char *modname = name;
122                 if (!request_module(modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 try_module_get(type->typ_dt_ops->o_owner);
135                 spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139
140 void class_put_type(struct obd_type *type)
141 {
142         LASSERT(type);
143         spin_lock(&type->obd_type_lock);
144         type->typ_refcnt--;
145         module_put(type->typ_dt_ops->o_owner);
146         spin_unlock(&type->obd_type_lock);
147 }
148
149 #define CLASS_MAX_NAME 1024
150
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152                         struct lprocfs_vars *vars, const char *name,
153                         struct lu_device_type *ldt)
154 {
155         struct obd_type *type;
156         int rc = 0;
157         ENTRY;
158
159         /* sanity check */
160         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
161
162         if (class_search_type(name)) {
163                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
164                 RETURN(-EEXIST);
165         }
166
167         rc = -ENOMEM;
168         OBD_ALLOC(type, sizeof(*type));
169         if (type == NULL)
170                 RETURN(rc);
171
172         OBD_ALLOC_PTR(type->typ_dt_ops);
173         OBD_ALLOC_PTR(type->typ_md_ops);
174         OBD_ALLOC(type->typ_name, strlen(name) + 1);
175
176         if (type->typ_dt_ops == NULL ||
177             type->typ_md_ops == NULL ||
178             type->typ_name == NULL)
179                 GOTO (failed, rc);
180
181         *(type->typ_dt_ops) = *dt_ops;
182         /* md_ops is optional */
183         if (md_ops)
184                 *(type->typ_md_ops) = *md_ops;
185         strcpy(type->typ_name, name);
186         spin_lock_init(&type->obd_type_lock);
187
188 #ifdef LPROCFS
189         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
190                                               vars, type);
191         if (IS_ERR(type->typ_procroot)) {
192                 rc = PTR_ERR(type->typ_procroot);
193                 type->typ_procroot = NULL;
194                 GOTO (failed, rc);
195         }
196 #endif
197         if (ldt != NULL) {
198                 type->typ_lu = ldt;
199                 rc = lu_device_type_init(ldt);
200                 if (rc != 0)
201                         GOTO (failed, rc);
202         }
203
204         spin_lock(&obd_types_lock);
205         list_add(&type->typ_chain, &obd_types);
206         spin_unlock(&obd_types_lock);
207
208         RETURN (0);
209
210  failed:
211         if (type->typ_name != NULL)
212                 OBD_FREE(type->typ_name, strlen(name) + 1);
213         if (type->typ_md_ops != NULL)
214                 OBD_FREE_PTR(type->typ_md_ops);
215         if (type->typ_dt_ops != NULL)
216                 OBD_FREE_PTR(type->typ_dt_ops);
217         OBD_FREE(type, sizeof(*type));
218         RETURN(rc);
219 }
220
221 int class_unregister_type(const char *name)
222 {
223         struct obd_type *type = class_search_type(name);
224         ENTRY;
225
226         if (!type) {
227                 CERROR("unknown obd type\n");
228                 RETURN(-EINVAL);
229         }
230
231         if (type->typ_refcnt) {
232                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233                 /* This is a bad situation, let's make the best of it */
234                 /* Remove ops, but leave the name for debugging */
235                 OBD_FREE_PTR(type->typ_dt_ops);
236                 OBD_FREE_PTR(type->typ_md_ops);
237                 RETURN(-EBUSY);
238         }
239
240         if (type->typ_procroot) {
241                 lprocfs_remove(&type->typ_procroot);
242         }
243
244         if (type->typ_lu)
245                 lu_device_type_fini(type->typ_lu);
246
247         spin_lock(&obd_types_lock);
248         list_del(&type->typ_chain);
249         spin_unlock(&obd_types_lock);
250         OBD_FREE(type->typ_name, strlen(name) + 1);
251         if (type->typ_dt_ops != NULL)
252                 OBD_FREE_PTR(type->typ_dt_ops);
253         if (type->typ_md_ops != NULL)
254                 OBD_FREE_PTR(type->typ_md_ops);
255         OBD_FREE(type, sizeof(*type));
256         RETURN(0);
257 } /* class_unregister_type */
258
259 /**
260  * Create a new obd device.
261  *
262  * Find an empty slot in ::obd_devs[], create a new obd device in it.
263  *
264  * \param typename [in] obd device type string.
265  * \param name     [in] obd device name.
266  *
267  * \retval NULL if create fails, otherwise return the obd device
268  *         pointer created.
269  */
270 struct obd_device *class_newdev(const char *type_name, const char *name)
271 {
272         struct obd_device *result = NULL;
273         struct obd_device *newdev;
274         struct obd_type *type = NULL;
275         int i;
276         int new_obd_minor = 0;
277
278         if (strlen(name) >= MAX_OBD_NAME) {
279                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280                 RETURN(ERR_PTR(-EINVAL));
281         }
282
283         type = class_get_type(type_name);
284         if (type == NULL){
285                 CERROR("OBD: unknown type: %s\n", type_name);
286                 RETURN(ERR_PTR(-ENODEV));
287         }
288
289         newdev = obd_device_alloc();
290         if (newdev == NULL) {
291                 class_put_type(type);
292                 RETURN(ERR_PTR(-ENOMEM));
293         }
294         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
295
296         spin_lock(&obd_dev_lock);
297         for (i = 0; i < class_devno_max(); i++) {
298                 struct obd_device *obd = class_num2obd(i);
299                 if (obd && obd->obd_name &&
300                     (strcmp(name, obd->obd_name) == 0)) {
301                         CERROR("Device %s already exists, won't add\n", name);
302                         if (result) {
303                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304                                          "%p obd_magic %08x != %08x\n", result,
305                                          result->obd_magic, OBD_DEVICE_MAGIC);
306                                 LASSERTF(result->obd_minor == new_obd_minor,
307                                          "%p obd_minor %d != %d\n", result,
308                                          result->obd_minor, new_obd_minor);
309
310                                 obd_devs[result->obd_minor] = NULL;
311                                 result->obd_name[0]='\0';
312                          }
313                         result = ERR_PTR(-EEXIST);
314                         break;
315                 }
316                 if (!result && !obd) {
317                         result = newdev;
318                         result->obd_minor = i;
319                         new_obd_minor = i;
320                         result->obd_type = type;
321                         strncpy(result->obd_name, name,
322                                 sizeof(result->obd_name) - 1);
323                         obd_devs[i] = result;
324                 }
325         }
326         spin_unlock(&obd_dev_lock);
327
328         if (result == NULL && i >= class_devno_max()) {
329                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
330                        class_devno_max());
331                 result = ERR_PTR(-EOVERFLOW);
332         }
333
334         if (IS_ERR(result)) {
335                 obd_device_free(newdev);
336                 class_put_type(type);
337         } else {
338                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339                        result->obd_name, result);
340         }
341         return result;
342 }
343
344 void class_release_dev(struct obd_device *obd)
345 {
346         struct obd_type *obd_type = obd->obd_type;
347
348         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352         LASSERT(obd_type != NULL);
353
354         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355                obd->obd_name,obd->obd_type->typ_name);
356
357         spin_lock(&obd_dev_lock);
358         obd_devs[obd->obd_minor] = NULL;
359         spin_unlock(&obd_dev_lock);
360         obd_device_free(obd);
361
362         class_put_type(obd_type);
363 }
364
365 int class_name2dev(const char *name)
366 {
367         int i;
368
369         if (!name)
370                 return -1;
371
372         spin_lock(&obd_dev_lock);
373         for (i = 0; i < class_devno_max(); i++) {
374                 struct obd_device *obd = class_num2obd(i);
375                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376                         /* Make sure we finished attaching before we give
377                            out any references */
378                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379                         if (obd->obd_attached) {
380                                 spin_unlock(&obd_dev_lock);
381                                 return i;
382                         }
383                         break;
384                 }
385         }
386         spin_unlock(&obd_dev_lock);
387
388         return -1;
389 }
390
391 struct obd_device *class_name2obd(const char *name)
392 {
393         int dev = class_name2dev(name);
394
395         if (dev < 0 || dev > class_devno_max())
396                 return NULL;
397         return class_num2obd(dev);
398 }
399
400 int class_uuid2dev(struct obd_uuid *uuid)
401 {
402         int i;
403
404         spin_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409                         spin_unlock(&obd_dev_lock);
410                         return i;
411                 }
412         }
413         spin_unlock(&obd_dev_lock);
414
415         return -1;
416 }
417
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
419 {
420         int dev = class_uuid2dev(uuid);
421         if (dev < 0)
422                 return NULL;
423         return class_num2obd(dev);
424 }
425
426 /**
427  * Get obd device from ::obd_devs[]
428  *
429  * \param num [in] array index
430  *
431  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432  *         otherwise return the obd device there.
433  */
434 struct obd_device *class_num2obd(int num)
435 {
436         struct obd_device *obd = NULL;
437
438         if (num < class_devno_max()) {
439                 obd = obd_devs[num];
440                 if (obd == NULL)
441                         return NULL;
442
443                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444                          "%p obd_magic %08x != %08x\n",
445                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446                 LASSERTF(obd->obd_minor == num,
447                          "%p obd_minor %0d != %0d\n",
448                          obd, obd->obd_minor, num);
449         }
450
451         return obd;
452 }
453
454 void class_obd_list(void)
455 {
456         char *status;
457         int i;
458
459         spin_lock(&obd_dev_lock);
460         for (i = 0; i < class_devno_max(); i++) {
461                 struct obd_device *obd = class_num2obd(i);
462                 if (obd == NULL)
463                         continue;
464                 if (obd->obd_stopping)
465                         status = "ST";
466                 else if (obd->obd_set_up)
467                         status = "UP";
468                 else if (obd->obd_attached)
469                         status = "AT";
470                 else
471                         status = "--";
472                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473                          i, status, obd->obd_type->typ_name,
474                          obd->obd_name, obd->obd_uuid.uuid,
475                          atomic_read(&obd->obd_refcount));
476         }
477         spin_unlock(&obd_dev_lock);
478         return;
479 }
480
481 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
482    specified, then only the client with that uuid is returned,
483    otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485                                           const char * typ_name,
486                                           struct obd_uuid *grp_uuid)
487 {
488         int i;
489
490         spin_lock(&obd_dev_lock);
491         for (i = 0; i < class_devno_max(); i++) {
492                 struct obd_device *obd = class_num2obd(i);
493                 if (obd == NULL)
494                         continue;
495                 if ((strncmp(obd->obd_type->typ_name, typ_name,
496                              strlen(typ_name)) == 0)) {
497                         if (obd_uuid_equals(tgt_uuid,
498                                             &obd->u.cli.cl_target_uuid) &&
499                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
500                                                          &obd->obd_uuid) : 1)) {
501                                 spin_unlock(&obd_dev_lock);
502                                 return obd;
503                         }
504                 }
505         }
506         spin_unlock(&obd_dev_lock);
507
508         return NULL;
509 }
510
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512    searching at *next, and if a device is found, the next index to look
513    at is saved in *next. If next is NULL, then the first matching device
514    will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
516 {
517         int i;
518
519         if (next == NULL)
520                 i = 0;
521         else if (*next >= 0 && *next < class_devno_max())
522                 i = *next;
523         else
524                 return NULL;
525
526         spin_lock(&obd_dev_lock);
527         for (; i < class_devno_max(); i++) {
528                 struct obd_device *obd = class_num2obd(i);
529                 if (obd == NULL)
530                         continue;
531                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532                         if (next != NULL)
533                                 *next = i+1;
534                         spin_unlock(&obd_dev_lock);
535                         return obd;
536                 }
537         }
538         spin_unlock(&obd_dev_lock);
539
540         return NULL;
541 }
542
543 /**
544  * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545  * adjust sptlrpc settings accordingly.
546  */
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
548 {
549         struct obd_device  *obd;
550         const char         *type;
551         int                 i, rc = 0, rc2;
552
553         LASSERT(namelen > 0);
554
555         spin_lock(&obd_dev_lock);
556         for (i = 0; i < class_devno_max(); i++) {
557                 obd = class_num2obd(i);
558
559                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
560                         continue;
561
562                 /* only notify mdc, osc, mdt, ost */
563                 type = obd->obd_type->typ_name;
564                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567                     strcmp(type, LUSTRE_OST_NAME) != 0)
568                         continue;
569
570                 if (strncmp(obd->obd_name, fsname, namelen))
571                         continue;
572
573                 class_incref(obd, __FUNCTION__, obd);
574                 spin_unlock(&obd_dev_lock);
575                 rc2 = obd_set_info_async(obd->obd_self_export,
576                                          sizeof(KEY_SPTLRPC_CONF),
577                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
578                 rc = rc ? rc : rc2;
579                 class_decref(obd, __FUNCTION__, obd);
580                 spin_lock(&obd_dev_lock);
581         }
582         spin_unlock(&obd_dev_lock);
583         return rc;
584 }
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
586
587 void obd_cleanup_caches(void)
588 {
589         int rc;
590
591         ENTRY;
592         if (obd_device_cachep) {
593                 rc = cfs_mem_cache_destroy(obd_device_cachep);
594                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595                 obd_device_cachep = NULL;
596         }
597         if (obdo_cachep) {
598                 rc = cfs_mem_cache_destroy(obdo_cachep);
599                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
600                 obdo_cachep = NULL;
601         }
602         if (import_cachep) {
603                 rc = cfs_mem_cache_destroy(import_cachep);
604                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605                 import_cachep = NULL;
606         }
607         if (capa_cachep) {
608                 rc = cfs_mem_cache_destroy(capa_cachep);
609                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
610                 capa_cachep = NULL;
611         }
612         EXIT;
613 }
614
615 int obd_init_caches(void)
616 {
617         ENTRY;
618
619         LASSERT(obd_device_cachep == NULL);
620         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621                                                  sizeof(struct obd_device),
622                                                  0, 0);
623         if (!obd_device_cachep)
624                 GOTO(out, -ENOMEM);
625
626         LASSERT(obdo_cachep == NULL);
627         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
628                                            0, 0);
629         if (!obdo_cachep)
630                 GOTO(out, -ENOMEM);
631
632         LASSERT(import_cachep == NULL);
633         import_cachep = cfs_mem_cache_create("ll_import_cache",
634                                              sizeof(struct obd_import),
635                                              0, 0);
636         if (!import_cachep)
637                 GOTO(out, -ENOMEM);
638
639         LASSERT(capa_cachep == NULL);
640         capa_cachep = cfs_mem_cache_create("capa_cache",
641                                            sizeof(struct obd_capa), 0, 0);
642         if (!capa_cachep)
643                 GOTO(out, -ENOMEM);
644
645         RETURN(0);
646  out:
647         obd_cleanup_caches();
648         RETURN(-ENOMEM);
649
650 }
651
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
654 {
655         struct obd_export *export;
656         ENTRY;
657
658         if (!conn) {
659                 CDEBUG(D_CACHE, "looking for null handle\n");
660                 RETURN(NULL);
661         }
662
663         if (conn->cookie == -1) {  /* this means assign a new connection */
664                 CDEBUG(D_CACHE, "want a new connection\n");
665                 RETURN(NULL);
666         }
667
668         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669         export = class_handle2object(conn->cookie);
670         RETURN(export);
671 }
672
673 struct obd_device *class_exp2obd(struct obd_export *exp)
674 {
675         if (exp)
676                 return exp->exp_obd;
677         return NULL;
678 }
679
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
681 {
682         struct obd_export *export;
683         export = class_conn2export(conn);
684         if (export) {
685                 struct obd_device *obd = export->exp_obd;
686                 class_export_put(export);
687                 return obd;
688         }
689         return NULL;
690 }
691
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
693 {
694         struct obd_device *obd = exp->exp_obd;
695         if (obd == NULL)
696                 return NULL;
697         return obd->u.cli.cl_import;
698 }
699
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
701 {
702         struct obd_device *obd = class_conn2obd(conn);
703         if (obd == NULL)
704                 return NULL;
705         return obd->u.cli.cl_import;
706 }
707
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
710 {
711         struct obd_device *obd = exp->exp_obd;
712         ENTRY;
713
714         LASSERT (atomic_read(&exp->exp_refcount) == 0);
715
716         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717                exp->exp_client_uuid.uuid, obd->obd_name);
718
719         LASSERT(obd != NULL);
720
721         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722         if (exp->exp_connection)
723                 ptlrpc_put_connection_superhack(exp->exp_connection);
724
725         LASSERT(list_empty(&exp->exp_outstanding_replies));
726         LASSERT(list_empty(&exp->exp_uncommitted_replies));
727         LASSERT(list_empty(&exp->exp_req_replay_queue));
728         LASSERT(list_empty(&exp->exp_queued_rpc));
729         obd_destroy_export(exp);
730         class_decref(obd, "export", exp);
731
732         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
733         EXIT;
734 }
735
736 static void export_handle_addref(void *export)
737 {
738         class_export_get(export);
739 }
740
741 struct obd_export *class_export_get(struct obd_export *exp)
742 {
743         atomic_inc(&exp->exp_refcount);
744         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745                atomic_read(&exp->exp_refcount));
746         return exp;
747 }
748 EXPORT_SYMBOL(class_export_get);
749
750 void class_export_put(struct obd_export *exp)
751 {
752         LASSERT(exp != NULL);
753         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
754                atomic_read(&exp->exp_refcount) - 1);
755         LASSERT(atomic_read(&exp->exp_refcount) > 0);
756         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
757
758         if (atomic_dec_and_test(&exp->exp_refcount)) {
759                 CDEBUG(D_IOCTL, "final put %p/%s\n",
760                        exp, exp->exp_client_uuid.uuid);
761                 obd_zombie_export_add(exp);
762         }
763 }
764 EXPORT_SYMBOL(class_export_put);
765
766 /* Creates a new export, adds it to the hash table, and returns a
767  * pointer to it. The refcount is 2: one for the hash reference, and
768  * one for the pointer returned by this function. */
769 struct obd_export *class_new_export(struct obd_device *obd,
770                                     struct obd_uuid *cluuid)
771 {
772         struct obd_export *export;
773         int rc = 0;
774
775         OBD_ALLOC_PTR(export);
776         if (!export)
777                 return ERR_PTR(-ENOMEM);
778
779         export->exp_conn_cnt = 0;
780         export->exp_lock_hash = NULL;
781         atomic_set(&export->exp_refcount, 2);
782         atomic_set(&export->exp_rpc_count, 0);
783         export->exp_obd = obd;
784         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
785         spin_lock_init(&export->exp_uncommitted_replies_lock);
786         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
787         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
788         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
789         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
790         class_handle_hash(&export->exp_handle, export_handle_addref);
791         export->exp_last_request_time = cfs_time_current_sec();
792         spin_lock_init(&export->exp_lock);
793         INIT_HLIST_NODE(&export->exp_uuid_hash);
794         INIT_HLIST_NODE(&export->exp_nid_hash);
795
796         export->exp_sp_peer = LUSTRE_SP_ANY;
797         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
798         export->exp_client_uuid = *cluuid;
799         obd_init_export(export);
800
801         spin_lock(&obd->obd_dev_lock);
802         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
803                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
804                                             &export->exp_uuid_hash);
805                 if (rc != 0) {
806                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
807                                       obd->obd_name, cluuid->uuid, rc);
808                         spin_unlock(&obd->obd_dev_lock);
809                         class_handle_unhash(&export->exp_handle);
810                         OBD_FREE_PTR(export);
811                         return ERR_PTR(-EALREADY);
812                 }
813         }
814
815         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
816         class_incref(obd, "export", export);
817         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
818         list_add_tail(&export->exp_obd_chain_timed,
819                       &export->exp_obd->obd_exports_timed);
820         export->exp_obd->obd_num_exports++;
821         spin_unlock(&obd->obd_dev_lock);
822
823         return export;
824 }
825 EXPORT_SYMBOL(class_new_export);
826
827 void class_unlink_export(struct obd_export *exp)
828 {
829         class_handle_unhash(&exp->exp_handle);
830
831         spin_lock(&exp->exp_obd->obd_dev_lock);
832         /* delete an uuid-export hashitem from hashtables */
833         if (!hlist_unhashed(&exp->exp_uuid_hash))
834                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
835                                 &exp->exp_client_uuid,
836                                 &exp->exp_uuid_hash);
837
838         list_del_init(&exp->exp_obd_chain);
839         list_del_init(&exp->exp_obd_chain_timed);
840         exp->exp_obd->obd_num_exports--;
841         spin_unlock(&exp->exp_obd->obd_dev_lock);
842
843         /* Keep these counter valid always */
844         spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
845         if (exp->exp_delayed)
846                 exp->exp_obd->obd_delayed_clients--;
847         else if (exp->exp_in_recovery)
848                 exp->exp_obd->obd_recoverable_clients--;
849         else if (exp->exp_obd->obd_recovering)
850                 exp->exp_obd->obd_max_recoverable_clients--;
851         spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
852         class_export_put(exp);
853 }
854 EXPORT_SYMBOL(class_unlink_export);
855
856 /* Import management functions */
857 void class_import_destroy(struct obd_import *imp)
858 {
859         ENTRY;
860
861         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
862                 imp->imp_obd->obd_name);
863
864         LASSERT(atomic_read(&imp->imp_refcount) == 0);
865
866         ptlrpc_put_connection_superhack(imp->imp_connection);
867
868         while (!list_empty(&imp->imp_conn_list)) {
869                 struct obd_import_conn *imp_conn;
870
871                 imp_conn = list_entry(imp->imp_conn_list.next,
872                                       struct obd_import_conn, oic_item);
873                 list_del_init(&imp_conn->oic_item);
874                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
875                 OBD_FREE(imp_conn, sizeof(*imp_conn));
876         }
877
878         LASSERT(imp->imp_sec == NULL);
879         class_decref(imp->imp_obd, "import", imp);
880         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
881         EXIT;
882 }
883
884 static void import_handle_addref(void *import)
885 {
886         class_import_get(import);
887 }
888
889 struct obd_import *class_import_get(struct obd_import *import)
890 {
891         LASSERT(atomic_read(&import->imp_refcount) >= 0);
892         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
893         atomic_inc(&import->imp_refcount);
894         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
895                atomic_read(&import->imp_refcount), 
896                import->imp_obd->obd_name);
897         return import;
898 }
899 EXPORT_SYMBOL(class_import_get);
900
901 void class_import_put(struct obd_import *imp)
902 {
903         ENTRY;
904
905         LASSERT(atomic_read(&imp->imp_refcount) > 0);
906         LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
907         LASSERT(list_empty(&imp->imp_zombie_chain));
908
909         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
910                atomic_read(&imp->imp_refcount) - 1, 
911                imp->imp_obd->obd_name);
912
913         if (atomic_dec_and_test(&imp->imp_refcount)) {
914                 CDEBUG(D_INFO, "final put import %p\n", imp);
915                 obd_zombie_import_add(imp);
916         }
917
918         EXIT;
919 }
920 EXPORT_SYMBOL(class_import_put);
921
922 static void init_imp_at(struct imp_at *at) {
923         int i;
924         at_init(&at->iat_net_latency, 0, 0);
925         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
926                 /* max service estimates are tracked on the server side, so
927                    don't use the AT history here, just use the last reported
928                    val. (But keep hist for proc histogram, worst_ever) */
929                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
930                         AT_FLG_NOHIST);
931         }
932 }
933
934 struct obd_import *class_new_import(struct obd_device *obd)
935 {
936         struct obd_import *imp;
937
938         OBD_ALLOC(imp, sizeof(*imp));
939         if (imp == NULL)
940                 return NULL;
941
942         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
943         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
944         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
945         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
946         spin_lock_init(&imp->imp_lock);
947         imp->imp_last_success_conn = 0;
948         imp->imp_state = LUSTRE_IMP_NEW;
949         imp->imp_obd = class_incref(obd, "import", imp);
950         sema_init(&imp->imp_sec_mutex, 1);
951         cfs_waitq_init(&imp->imp_recovery_waitq);
952
953         atomic_set(&imp->imp_refcount, 2);
954         atomic_set(&imp->imp_unregistering, 0);
955         atomic_set(&imp->imp_inflight, 0);
956         atomic_set(&imp->imp_replay_inflight, 0);
957         atomic_set(&imp->imp_inval_count, 0);
958         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
959         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
960         class_handle_hash(&imp->imp_handle, import_handle_addref);
961         init_imp_at(&imp->imp_at);
962
963         /* the default magic is V2, will be used in connect RPC, and
964          * then adjusted according to the flags in request/reply. */
965         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
966
967         return imp;
968 }
969 EXPORT_SYMBOL(class_new_import);
970
971 void class_destroy_import(struct obd_import *import)
972 {
973         LASSERT(import != NULL);
974         LASSERT(import != LP_POISON);
975
976         class_handle_unhash(&import->imp_handle);
977
978         spin_lock(&import->imp_lock);
979         import->imp_generation++;
980         spin_unlock(&import->imp_lock);
981         class_import_put(import);
982 }
983 EXPORT_SYMBOL(class_destroy_import);
984
985 /* A connection defines an export context in which preallocation can
986    be managed. This releases the export pointer reference, and returns
987    the export handle, so the export refcount is 1 when this function
988    returns. */
989 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
990                   struct obd_uuid *cluuid)
991 {
992         struct obd_export *export;
993         LASSERT(conn != NULL);
994         LASSERT(obd != NULL);
995         LASSERT(cluuid != NULL);
996         ENTRY;
997
998         export = class_new_export(obd, cluuid);
999         if (IS_ERR(export))
1000                 RETURN(PTR_ERR(export));
1001
1002         conn->cookie = export->exp_handle.h_cookie;
1003         class_export_put(export);
1004
1005         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1006                cluuid->uuid, conn->cookie);
1007         RETURN(0);
1008 }
1009 EXPORT_SYMBOL(class_connect);
1010
1011 /* if export is involved in recovery then clean up related things */
1012 void class_export_recovery_cleanup(struct obd_export *exp)
1013 {
1014         struct obd_device *obd = exp->exp_obd;
1015
1016         spin_lock_bh(&obd->obd_processing_task_lock);
1017         if (obd->obd_recovering && exp->exp_in_recovery) {
1018                 spin_lock(&exp->exp_lock);
1019                 exp->exp_in_recovery = 0;
1020                 spin_unlock(&exp->exp_lock);
1021                 obd->obd_connected_clients--;
1022                 /* each connected client is counted as recoverable */
1023                 obd->obd_recoverable_clients--;
1024                 if (exp->exp_req_replay_needed) {
1025                         spin_lock(&exp->exp_lock);
1026                         exp->exp_req_replay_needed = 0;
1027                         spin_unlock(&exp->exp_lock);
1028                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
1029                         atomic_dec(&obd->obd_req_replay_clients);
1030                 }
1031                 if (exp->exp_lock_replay_needed) {
1032                         spin_lock(&exp->exp_lock);
1033                         exp->exp_lock_replay_needed = 0;
1034                         spin_unlock(&exp->exp_lock);
1035                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1036                         atomic_dec(&obd->obd_lock_replay_clients);
1037                 }
1038         }
1039         spin_unlock_bh(&obd->obd_processing_task_lock);
1040 }
1041
1042 /* This function removes two references from the export: one for the
1043  * hash entry and one for the export pointer passed in.  The export
1044  * pointer passed to this function is destroyed should not be used
1045  * again. */
1046 int class_disconnect(struct obd_export *export)
1047 {
1048         int already_disconnected;
1049         ENTRY;
1050
1051         if (export == NULL) {
1052                 fixme();
1053                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1054                 RETURN(-EINVAL);
1055         }
1056
1057         spin_lock(&export->exp_lock);
1058         already_disconnected = export->exp_disconnected;
1059         export->exp_disconnected = 1;
1060
1061         if (!hlist_unhashed(&export->exp_nid_hash))
1062                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1063                                 &export->exp_connection->c_peer.nid,
1064                                 &export->exp_nid_hash);
1065
1066         spin_unlock(&export->exp_lock);
1067
1068         /* class_cleanup(), abort_recovery(), and class_fail_export()
1069          * all end up in here, and if any of them race we shouldn't
1070          * call extra class_export_puts(). */
1071         if (already_disconnected)
1072                 RETURN(0);
1073
1074         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1075                export->exp_handle.h_cookie);
1076
1077         class_export_recovery_cleanup(export);
1078         class_unlink_export(export);
1079         class_export_put(export);
1080         RETURN(0);
1081 }
1082
1083 static void class_disconnect_export_list(struct list_head *list,
1084                                          enum obd_option flags)
1085 {
1086         int rc;
1087         struct lustre_handle fake_conn;
1088         struct obd_export *fake_exp, *exp;
1089         ENTRY;
1090
1091         /* It's possible that an export may disconnect itself, but
1092          * nothing else will be added to this list. */
1093         while (!list_empty(list)) {
1094                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1095                 class_export_get(exp);
1096
1097                 spin_lock(&exp->exp_lock);
1098                 exp->exp_flags = flags;
1099                 spin_unlock(&exp->exp_lock);
1100
1101                 if (obd_uuid_equals(&exp->exp_client_uuid,
1102                                     &exp->exp_obd->obd_uuid)) {
1103                         CDEBUG(D_HA,
1104                                "exp %p export uuid == obd uuid, don't discon\n",
1105                                exp);
1106                         /* Need to delete this now so we don't end up pointing
1107                          * to work_list later when this export is cleaned up. */
1108                         list_del_init(&exp->exp_obd_chain);
1109                         class_export_put(exp);
1110                         continue;
1111                 }
1112
1113                 fake_conn.cookie = exp->exp_handle.h_cookie;
1114                 fake_exp = class_conn2export(&fake_conn);
1115                 if (!fake_exp) {
1116                         class_export_put(exp);
1117                         continue;
1118                 }
1119
1120                 spin_lock(&fake_exp->exp_lock);
1121                 fake_exp->exp_flags = flags;
1122                 spin_unlock(&fake_exp->exp_lock);
1123
1124                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1125                        "last request at "CFS_TIME_T"\n",
1126                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1127                        exp, exp->exp_last_request_time);
1128                 rc = obd_disconnect(fake_exp);
1129                 class_export_put(exp);
1130         }
1131         EXIT;
1132 }
1133
1134 void class_disconnect_exports(struct obd_device *obd)
1135 {
1136         struct list_head work_list;
1137         ENTRY;
1138
1139         /* Move all of the exports from obd_exports to a work list, en masse. */
1140         CFS_INIT_LIST_HEAD(&work_list);
1141         spin_lock(&obd->obd_dev_lock);
1142         list_splice_init(&obd->obd_exports, &work_list);
1143         list_splice_init(&obd->obd_delayed_exports, &work_list);
1144         spin_unlock(&obd->obd_dev_lock);
1145
1146         if (!list_empty(&work_list)) {
1147                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1148                        "disconnecting them\n", obd->obd_minor, obd);
1149                 class_disconnect_export_list(&work_list,
1150                                              exp_flags_from_obd(obd));
1151         } else
1152                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1153                        obd->obd_minor, obd);
1154         EXIT;
1155 }
1156 EXPORT_SYMBOL(class_disconnect_exports);
1157
1158 /* Remove exports that have not completed recovery.
1159  */
1160 int class_disconnect_stale_exports(struct obd_device *obd,
1161                                    int (*test_export)(struct obd_export *),
1162                                    enum obd_option flags)
1163 {
1164         struct list_head work_list;
1165         struct list_head *pos, *n;
1166         struct obd_export *exp;
1167         int cnt = 0;
1168         ENTRY;
1169
1170         CFS_INIT_LIST_HEAD(&work_list);
1171         spin_lock(&obd->obd_dev_lock);
1172         list_for_each_safe(pos, n, &obd->obd_exports) {
1173                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1174                 if (test_export(exp))
1175                         continue;
1176
1177                 list_move(&exp->exp_obd_chain, &work_list);
1178                 /* don't count self-export as client */
1179                 if (obd_uuid_equals(&exp->exp_client_uuid,
1180                                      &exp->exp_obd->obd_uuid))
1181                         continue;
1182
1183                 cnt++;
1184                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1185                        obd->obd_name, exp->exp_client_uuid.uuid,
1186                        exp->exp_connection == NULL ? "<unknown>" :
1187                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1188         }
1189         spin_unlock(&obd->obd_dev_lock);
1190
1191         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1192                obd->obd_name, cnt);
1193         class_disconnect_export_list(&work_list, flags);
1194         RETURN(cnt);
1195 }
1196 EXPORT_SYMBOL(class_disconnect_stale_exports);
1197
1198 void class_fail_export(struct obd_export *exp)
1199 {
1200         int rc, already_failed;
1201
1202         spin_lock(&exp->exp_lock);
1203         already_failed = exp->exp_failed;
1204         exp->exp_failed = 1;
1205         spin_unlock(&exp->exp_lock);
1206
1207         if (already_failed) {
1208                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1209                        exp, exp->exp_client_uuid.uuid);
1210                 return;
1211         }
1212
1213         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1214                exp, exp->exp_client_uuid.uuid);
1215
1216         if (obd_dump_on_timeout)
1217                 libcfs_debug_dumplog();
1218
1219         /* Most callers into obd_disconnect are removing their own reference
1220          * (request, for example) in addition to the one from the hash table.
1221          * We don't have such a reference here, so make one. */
1222         class_export_get(exp);
1223         rc = obd_disconnect(exp);
1224         if (rc)
1225                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1226         else
1227                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1228                        exp, exp->exp_client_uuid.uuid);
1229 }
1230 EXPORT_SYMBOL(class_fail_export);
1231
1232 char *obd_export_nid2str(struct obd_export *exp)
1233 {
1234         if (exp->exp_connection != NULL)
1235                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1236
1237         return "(no nid)";
1238 }
1239 EXPORT_SYMBOL(obd_export_nid2str);
1240
1241 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1242 {
1243         struct obd_export *doomed_exp = NULL;
1244         int exports_evicted = 0;
1245
1246         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1247
1248         do {
1249                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1250                 if (doomed_exp == NULL)
1251                         break;
1252
1253                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1254                          "nid %s found, wanted nid %s, requested nid %s\n",
1255                          obd_export_nid2str(doomed_exp),
1256                          libcfs_nid2str(nid_key), nid);
1257                 LASSERTF(doomed_exp != obd->obd_self_export,
1258                          "self-export is hashed by NID?\n");
1259                 exports_evicted++;
1260                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1261                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1262                        exports_evicted);
1263                 class_fail_export(doomed_exp);
1264                 class_export_put(doomed_exp);
1265         } while (1);
1266
1267         if (!exports_evicted)
1268                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1269                        obd->obd_name, nid);
1270         return exports_evicted;
1271 }
1272 EXPORT_SYMBOL(obd_export_evict_by_nid);
1273
1274 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1275 {
1276         struct obd_export *doomed_exp = NULL;
1277         struct obd_uuid doomed_uuid;
1278         int exports_evicted = 0;
1279
1280         obd_str2uuid(&doomed_uuid, uuid);
1281         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1282                 CERROR("%s: can't evict myself\n", obd->obd_name);
1283                 return exports_evicted;
1284         }
1285
1286         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1287
1288         if (doomed_exp == NULL) {
1289                 CERROR("%s: can't disconnect %s: no exports found\n",
1290                        obd->obd_name, uuid);
1291         } else {
1292                 CWARN("%s: evicting %s at adminstrative request\n",
1293                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1294                 class_fail_export(doomed_exp);
1295                 class_export_put(doomed_exp);
1296                 exports_evicted++;
1297         }
1298
1299         return exports_evicted;
1300 }
1301 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1302
1303 /**
1304  * kill zombie imports and exports
1305  */
1306 void obd_zombie_impexp_cull(void)
1307 {
1308         struct obd_import *import;
1309         struct obd_export *export;
1310         ENTRY;
1311
1312         do {
1313                 spin_lock(&obd_zombie_impexp_lock);
1314
1315                 import = NULL;
1316                 if (!list_empty(&obd_zombie_imports)) {
1317                         import = list_entry(obd_zombie_imports.next,
1318                                             struct obd_import,
1319                                             imp_zombie_chain);
1320                         list_del_init(&import->imp_zombie_chain);
1321                 }
1322
1323                 export = NULL;
1324                 if (!list_empty(&obd_zombie_exports)) {
1325                         export = list_entry(obd_zombie_exports.next,
1326                                             struct obd_export,
1327                                             exp_obd_chain);
1328                         list_del_init(&export->exp_obd_chain);
1329                 }
1330
1331                 spin_unlock(&obd_zombie_impexp_lock);
1332
1333                 if (import != NULL)
1334                         class_import_destroy(import);
1335
1336                 if (export != NULL)
1337                         class_export_destroy(export);
1338
1339         } while (import != NULL || export != NULL);
1340         EXIT;
1341 }
1342
1343 static struct completion        obd_zombie_start;
1344 static struct completion        obd_zombie_stop;
1345 static unsigned long            obd_zombie_flags;
1346 static cfs_waitq_t              obd_zombie_waitq;
1347
1348 enum {
1349         OBD_ZOMBIE_STOP   = 1 << 1
1350 };
1351
1352 /**
1353  * check for work for kill zombie import/export thread.
1354  */
1355 static int obd_zombie_impexp_check(void *arg)
1356 {
1357         int rc;
1358
1359         spin_lock(&obd_zombie_impexp_lock);
1360         rc = list_empty(&obd_zombie_imports) &&
1361              list_empty(&obd_zombie_exports) &&
1362              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1363
1364         spin_unlock(&obd_zombie_impexp_lock);
1365
1366         RETURN(rc);
1367 }
1368
1369 /**
1370  * Add export to the obd_zombe thread and notify it.
1371  */
1372 static void obd_zombie_export_add(struct obd_export *exp) {
1373         spin_lock(&obd_zombie_impexp_lock);
1374         LASSERT(list_empty(&exp->exp_obd_chain));
1375         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1376         spin_unlock(&obd_zombie_impexp_lock);
1377
1378         if (obd_zombie_impexp_notify != NULL)
1379                 obd_zombie_impexp_notify();
1380 }
1381
1382 /**
1383  * Add import to the obd_zombe thread and notify it.
1384  */
1385 static void obd_zombie_import_add(struct obd_import *imp) {
1386         spin_lock(&obd_zombie_impexp_lock);
1387         LASSERT(list_empty(&imp->imp_zombie_chain));
1388         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1389         spin_unlock(&obd_zombie_impexp_lock);
1390
1391         if (obd_zombie_impexp_notify != NULL)
1392                 obd_zombie_impexp_notify();
1393 }
1394
1395 /**
1396  * notify import/export destroy thread about new zombie.
1397  */
1398 static void obd_zombie_impexp_notify(void)
1399 {
1400         cfs_waitq_signal(&obd_zombie_waitq);
1401 }
1402
1403 /**
1404  * check whether obd_zombie is idle
1405  */
1406 static int obd_zombie_is_idle(void)
1407 {
1408         int rc;
1409
1410         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1411         spin_lock(&obd_zombie_impexp_lock);
1412         rc = list_empty(&obd_zombie_imports) &&
1413              list_empty(&obd_zombie_exports);
1414         spin_unlock(&obd_zombie_impexp_lock);
1415         return rc;
1416 }
1417
1418 /**
1419  * wait when obd_zombie import/export queues become empty
1420  */
1421 void obd_zombie_barrier(void)
1422 {
1423         struct l_wait_info lwi = { 0 };
1424         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1425 }
1426 EXPORT_SYMBOL(obd_zombie_barrier);
1427
1428 #ifdef __KERNEL__
1429
1430 /**
1431  * destroy zombie export/import thread.
1432  */
1433 static int obd_zombie_impexp_thread(void *unused)
1434 {
1435         int rc;
1436
1437         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1438                 complete(&obd_zombie_start);
1439                 RETURN(rc);
1440         }
1441
1442         complete(&obd_zombie_start);
1443
1444         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1445                 struct l_wait_info lwi = { 0 };
1446
1447                 l_wait_event(obd_zombie_waitq, 
1448                              !obd_zombie_impexp_check(NULL), &lwi);
1449                 obd_zombie_impexp_cull();
1450
1451                 /* 
1452                  * Notify obd_zombie_barrier callers that queues
1453                  * may be empty.
1454                  */
1455                 cfs_waitq_signal(&obd_zombie_waitq);
1456         }
1457
1458         complete(&obd_zombie_stop);
1459
1460         RETURN(0);
1461 }
1462
1463 #else /* ! KERNEL */
1464
1465 static atomic_t zombie_recur = ATOMIC_INIT(0);
1466 static void *obd_zombie_impexp_work_cb;
1467 static void *obd_zombie_impexp_idle_cb;
1468
1469 int obd_zombie_impexp_kill(void *arg)
1470 {
1471         int rc = 0;
1472
1473         if (atomic_inc_return(&zombie_recur) == 1) {
1474                 obd_zombie_impexp_cull();
1475                 rc = 1;
1476         }
1477         atomic_dec(&zombie_recur);
1478         return rc;
1479 }
1480
1481 #endif
1482
1483 /**
1484  * start destroy zombie import/export thread
1485  */
1486 int obd_zombie_impexp_init(void)
1487 {
1488         int rc;
1489
1490         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1491         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1492         spin_lock_init(&obd_zombie_impexp_lock);
1493         init_completion(&obd_zombie_start);
1494         init_completion(&obd_zombie_stop);
1495         cfs_waitq_init(&obd_zombie_waitq);
1496
1497 #ifdef __KERNEL__
1498         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1499         if (rc < 0)
1500                 RETURN(rc);
1501
1502         wait_for_completion(&obd_zombie_start);
1503 #else
1504
1505         obd_zombie_impexp_work_cb =
1506                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1507                                                  &obd_zombie_impexp_kill, NULL);
1508
1509         obd_zombie_impexp_idle_cb =
1510                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1511                                                  &obd_zombie_impexp_check, NULL);
1512         rc = 0;
1513 #endif
1514         RETURN(rc);
1515 }
1516 /**
1517  * stop destroy zombie import/export thread
1518  */
1519 void obd_zombie_impexp_stop(void)
1520 {
1521         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1522         obd_zombie_impexp_notify();
1523 #ifdef __KERNEL__
1524         wait_for_completion(&obd_zombie_stop);
1525 #else
1526         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1527         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1528 #endif
1529 }