Whamcloud - gitweb
4af4a6116d73f99a83d73ee0ade400289e6ba868
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82 EXPORT_SYMBOL(obd_device_alloc);
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef CONFIG_KMOD
120         if (!type) {
121                 const char *modname = name;
122                 if (!request_module(modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 try_module_get(type->typ_dt_ops->o_owner);
135                 spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139
140 void class_put_type(struct obd_type *type)
141 {
142         LASSERT(type);
143         spin_lock(&type->obd_type_lock);
144         type->typ_refcnt--;
145         module_put(type->typ_dt_ops->o_owner);
146         spin_unlock(&type->obd_type_lock);
147 }
148
149 #define CLASS_MAX_NAME 1024
150
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152                         struct lprocfs_vars *vars, const char *name,
153                         struct lu_device_type *ldt)
154 {
155         struct obd_type *type;
156         int rc = 0;
157         ENTRY;
158
159         /* sanity check */
160         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
161
162         if (class_search_type(name)) {
163                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
164                 RETURN(-EEXIST);
165         }
166
167         rc = -ENOMEM;
168         OBD_ALLOC(type, sizeof(*type));
169         if (type == NULL)
170                 RETURN(rc);
171
172         OBD_ALLOC_PTR(type->typ_dt_ops);
173         OBD_ALLOC_PTR(type->typ_md_ops);
174         OBD_ALLOC(type->typ_name, strlen(name) + 1);
175
176         if (type->typ_dt_ops == NULL ||
177             type->typ_md_ops == NULL ||
178             type->typ_name == NULL)
179                 GOTO (failed, rc);
180
181         *(type->typ_dt_ops) = *dt_ops;
182         /* md_ops is optional */
183         if (md_ops)
184                 *(type->typ_md_ops) = *md_ops;
185         strcpy(type->typ_name, name);
186         spin_lock_init(&type->obd_type_lock);
187
188 #ifdef LPROCFS
189         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
190                                               vars, type);
191         if (IS_ERR(type->typ_procroot)) {
192                 rc = PTR_ERR(type->typ_procroot);
193                 type->typ_procroot = NULL;
194                 GOTO (failed, rc);
195         }
196 #endif
197         if (ldt != NULL) {
198                 type->typ_lu = ldt;
199                 rc = lu_device_type_init(ldt);
200                 if (rc != 0)
201                         GOTO (failed, rc);
202         }
203
204         spin_lock(&obd_types_lock);
205         list_add(&type->typ_chain, &obd_types);
206         spin_unlock(&obd_types_lock);
207
208         RETURN (0);
209
210  failed:
211         if (type->typ_name != NULL)
212                 OBD_FREE(type->typ_name, strlen(name) + 1);
213         if (type->typ_md_ops != NULL)
214                 OBD_FREE_PTR(type->typ_md_ops);
215         if (type->typ_dt_ops != NULL)
216                 OBD_FREE_PTR(type->typ_dt_ops);
217         OBD_FREE(type, sizeof(*type));
218         RETURN(rc);
219 }
220
221 int class_unregister_type(const char *name)
222 {
223         struct obd_type *type = class_search_type(name);
224         ENTRY;
225
226         if (!type) {
227                 CERROR("unknown obd type\n");
228                 RETURN(-EINVAL);
229         }
230
231         if (type->typ_refcnt) {
232                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233                 /* This is a bad situation, let's make the best of it */
234                 /* Remove ops, but leave the name for debugging */
235                 OBD_FREE_PTR(type->typ_dt_ops);
236                 OBD_FREE_PTR(type->typ_md_ops);
237                 RETURN(-EBUSY);
238         }
239
240         if (type->typ_procroot) {
241                 lprocfs_remove(&type->typ_procroot);
242         }
243
244         if (type->typ_lu)
245                 lu_device_type_fini(type->typ_lu);
246
247         spin_lock(&obd_types_lock);
248         list_del(&type->typ_chain);
249         spin_unlock(&obd_types_lock);
250         OBD_FREE(type->typ_name, strlen(name) + 1);
251         if (type->typ_dt_ops != NULL)
252                 OBD_FREE_PTR(type->typ_dt_ops);
253         if (type->typ_md_ops != NULL)
254                 OBD_FREE_PTR(type->typ_md_ops);
255         OBD_FREE(type, sizeof(*type));
256         RETURN(0);
257 } /* class_unregister_type */
258
259 /**
260  * Create a new obd device.
261  *
262  * Find an empty slot in ::obd_devs[], create a new obd device in it.
263  *
264  * \param typename [in] obd device type string.
265  * \param name     [in] obd device name.
266  *
267  * \retval NULL if create fails, otherwise return the obd device
268  *         pointer created.
269  */
270 struct obd_device *class_newdev(const char *type_name, const char *name)
271 {
272         struct obd_device *result = NULL;
273         struct obd_device *newdev;
274         struct obd_type *type = NULL;
275         int i;
276         int new_obd_minor = 0;
277
278         if (strlen(name) >= MAX_OBD_NAME) {
279                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280                 RETURN(ERR_PTR(-EINVAL));
281         }
282
283         type = class_get_type(type_name);
284         if (type == NULL){
285                 CERROR("OBD: unknown type: %s\n", type_name);
286                 RETURN(ERR_PTR(-ENODEV));
287         }
288
289         newdev = obd_device_alloc();
290         if (newdev == NULL) {
291                 class_put_type(type);
292                 RETURN(ERR_PTR(-ENOMEM));
293         }
294         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
295
296         spin_lock(&obd_dev_lock);
297         for (i = 0; i < class_devno_max(); i++) {
298                 struct obd_device *obd = class_num2obd(i);
299                 if (obd && obd->obd_name &&
300                     (strcmp(name, obd->obd_name) == 0)) {
301                         CERROR("Device %s already exists, won't add\n", name);
302                         if (result) {
303                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304                                          "%p obd_magic %08x != %08x\n", result,
305                                          result->obd_magic, OBD_DEVICE_MAGIC);
306                                 LASSERTF(result->obd_minor == new_obd_minor,
307                                          "%p obd_minor %d != %d\n", result,
308                                          result->obd_minor, new_obd_minor);
309
310                                 obd_devs[result->obd_minor] = NULL;
311                                 result->obd_name[0]='\0';
312                          }
313                         result = ERR_PTR(-EEXIST);
314                         break;
315                 }
316                 if (!result && !obd) {
317                         result = newdev;
318                         result->obd_minor = i;
319                         new_obd_minor = i;
320                         result->obd_type = type;
321                         strncpy(result->obd_name, name,
322                                 sizeof(result->obd_name) - 1);
323                         obd_devs[i] = result;
324                 }
325         }
326         spin_unlock(&obd_dev_lock);
327
328         if (result == NULL && i >= class_devno_max()) {
329                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
330                        class_devno_max());
331                 result = ERR_PTR(-EOVERFLOW);
332         }
333
334         if (IS_ERR(result)) {
335                 obd_device_free(newdev);
336                 class_put_type(type);
337         } else {
338                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339                        result->obd_name, result);
340         }
341         return result;
342 }
343
344 void class_release_dev(struct obd_device *obd)
345 {
346         struct obd_type *obd_type = obd->obd_type;
347
348         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352         LASSERT(obd_type != NULL);
353
354         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355                obd->obd_name,obd->obd_type->typ_name);
356
357         spin_lock(&obd_dev_lock);
358         obd_devs[obd->obd_minor] = NULL;
359         spin_unlock(&obd_dev_lock);
360         obd_device_free(obd);
361
362         class_put_type(obd_type);
363 }
364
365 int class_name2dev(const char *name)
366 {
367         int i;
368
369         if (!name)
370                 return -1;
371
372         spin_lock(&obd_dev_lock);
373         for (i = 0; i < class_devno_max(); i++) {
374                 struct obd_device *obd = class_num2obd(i);
375                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376                         /* Make sure we finished attaching before we give
377                            out any references */
378                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379                         if (obd->obd_attached) {
380                                 spin_unlock(&obd_dev_lock);
381                                 return i;
382                         }
383                         break;
384                 }
385         }
386         spin_unlock(&obd_dev_lock);
387
388         return -1;
389 }
390
391 struct obd_device *class_name2obd(const char *name)
392 {
393         int dev = class_name2dev(name);
394
395         if (dev < 0 || dev > class_devno_max())
396                 return NULL;
397         return class_num2obd(dev);
398 }
399
400 int class_uuid2dev(struct obd_uuid *uuid)
401 {
402         int i;
403
404         spin_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409                         spin_unlock(&obd_dev_lock);
410                         return i;
411                 }
412         }
413         spin_unlock(&obd_dev_lock);
414
415         return -1;
416 }
417
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
419 {
420         int dev = class_uuid2dev(uuid);
421         if (dev < 0)
422                 return NULL;
423         return class_num2obd(dev);
424 }
425
426 /**
427  * Get obd device from ::obd_devs[]
428  *
429  * \param num [in] array index
430  *
431  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432  *         otherwise return the obd device there.
433  */
434 struct obd_device *class_num2obd(int num)
435 {
436         struct obd_device *obd = NULL;
437
438         if (num < class_devno_max()) {
439                 obd = obd_devs[num];
440                 if (obd == NULL)
441                         return NULL;
442
443                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444                          "%p obd_magic %08x != %08x\n",
445                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446                 LASSERTF(obd->obd_minor == num,
447                          "%p obd_minor %0d != %0d\n",
448                          obd, obd->obd_minor, num);
449         }
450
451         return obd;
452 }
453
454 void class_obd_list(void)
455 {
456         char *status;
457         int i;
458
459         spin_lock(&obd_dev_lock);
460         for (i = 0; i < class_devno_max(); i++) {
461                 struct obd_device *obd = class_num2obd(i);
462                 if (obd == NULL)
463                         continue;
464                 if (obd->obd_stopping)
465                         status = "ST";
466                 else if (obd->obd_set_up)
467                         status = "UP";
468                 else if (obd->obd_attached)
469                         status = "AT";
470                 else
471                         status = "--";
472                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473                          i, status, obd->obd_type->typ_name,
474                          obd->obd_name, obd->obd_uuid.uuid,
475                          atomic_read(&obd->obd_refcount));
476         }
477         spin_unlock(&obd_dev_lock);
478         return;
479 }
480
481 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
482    specified, then only the client with that uuid is returned,
483    otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485                                           const char * typ_name,
486                                           struct obd_uuid *grp_uuid)
487 {
488         int i;
489
490         spin_lock(&obd_dev_lock);
491         for (i = 0; i < class_devno_max(); i++) {
492                 struct obd_device *obd = class_num2obd(i);
493                 if (obd == NULL)
494                         continue;
495                 if ((strncmp(obd->obd_type->typ_name, typ_name,
496                              strlen(typ_name)) == 0)) {
497                         if (obd_uuid_equals(tgt_uuid,
498                                             &obd->u.cli.cl_target_uuid) &&
499                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
500                                                          &obd->obd_uuid) : 1)) {
501                                 spin_unlock(&obd_dev_lock);
502                                 return obd;
503                         }
504                 }
505         }
506         spin_unlock(&obd_dev_lock);
507
508         return NULL;
509 }
510
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512    searching at *next, and if a device is found, the next index to look
513    at is saved in *next. If next is NULL, then the first matching device
514    will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
516 {
517         int i;
518
519         if (next == NULL)
520                 i = 0;
521         else if (*next >= 0 && *next < class_devno_max())
522                 i = *next;
523         else
524                 return NULL;
525
526         spin_lock(&obd_dev_lock);
527         for (; i < class_devno_max(); i++) {
528                 struct obd_device *obd = class_num2obd(i);
529                 if (obd == NULL)
530                         continue;
531                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532                         if (next != NULL)
533                                 *next = i+1;
534                         spin_unlock(&obd_dev_lock);
535                         return obd;
536                 }
537         }
538         spin_unlock(&obd_dev_lock);
539
540         return NULL;
541 }
542
543 /**
544  * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545  * adjust sptlrpc settings accordingly.
546  */
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
548 {
549         struct obd_device  *obd;
550         const char         *type;
551         int                 i, rc = 0, rc2;
552
553         LASSERT(namelen > 0);
554
555         spin_lock(&obd_dev_lock);
556         for (i = 0; i < class_devno_max(); i++) {
557                 obd = class_num2obd(i);
558
559                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
560                         continue;
561
562                 /* only notify mdc, osc, mdt, ost */
563                 type = obd->obd_type->typ_name;
564                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567                     strcmp(type, LUSTRE_OST_NAME) != 0)
568                         continue;
569
570                 if (strncmp(obd->obd_name, fsname, namelen))
571                         continue;
572
573                 class_incref(obd, __FUNCTION__, obd);
574                 spin_unlock(&obd_dev_lock);
575                 rc2 = obd_set_info_async(obd->obd_self_export,
576                                          sizeof(KEY_SPTLRPC_CONF),
577                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
578                 rc = rc ? rc : rc2;
579                 class_decref(obd, __FUNCTION__, obd);
580                 spin_lock(&obd_dev_lock);
581         }
582         spin_unlock(&obd_dev_lock);
583         return rc;
584 }
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
586
587 void obd_cleanup_caches(void)
588 {
589         int rc;
590
591         ENTRY;
592         if (obd_device_cachep) {
593                 rc = cfs_mem_cache_destroy(obd_device_cachep);
594                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595                 obd_device_cachep = NULL;
596         }
597         if (obdo_cachep) {
598                 rc = cfs_mem_cache_destroy(obdo_cachep);
599                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
600                 obdo_cachep = NULL;
601         }
602         if (import_cachep) {
603                 rc = cfs_mem_cache_destroy(import_cachep);
604                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605                 import_cachep = NULL;
606         }
607         if (capa_cachep) {
608                 rc = cfs_mem_cache_destroy(capa_cachep);
609                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
610                 capa_cachep = NULL;
611         }
612         EXIT;
613 }
614
615 int obd_init_caches(void)
616 {
617         ENTRY;
618
619         LASSERT(obd_device_cachep == NULL);
620         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621                                                  sizeof(struct obd_device),
622                                                  0, 0);
623         if (!obd_device_cachep)
624                 GOTO(out, -ENOMEM);
625
626         LASSERT(obdo_cachep == NULL);
627         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
628                                            0, 0);
629         if (!obdo_cachep)
630                 GOTO(out, -ENOMEM);
631
632         LASSERT(import_cachep == NULL);
633         import_cachep = cfs_mem_cache_create("ll_import_cache",
634                                              sizeof(struct obd_import),
635                                              0, 0);
636         if (!import_cachep)
637                 GOTO(out, -ENOMEM);
638
639         LASSERT(capa_cachep == NULL);
640         capa_cachep = cfs_mem_cache_create("capa_cache",
641                                            sizeof(struct obd_capa), 0, 0);
642         if (!capa_cachep)
643                 GOTO(out, -ENOMEM);
644
645         RETURN(0);
646  out:
647         obd_cleanup_caches();
648         RETURN(-ENOMEM);
649
650 }
651
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
654 {
655         struct obd_export *export;
656         ENTRY;
657
658         if (!conn) {
659                 CDEBUG(D_CACHE, "looking for null handle\n");
660                 RETURN(NULL);
661         }
662
663         if (conn->cookie == -1) {  /* this means assign a new connection */
664                 CDEBUG(D_CACHE, "want a new connection\n");
665                 RETURN(NULL);
666         }
667
668         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669         export = class_handle2object(conn->cookie);
670         RETURN(export);
671 }
672
673 struct obd_device *class_exp2obd(struct obd_export *exp)
674 {
675         if (exp)
676                 return exp->exp_obd;
677         return NULL;
678 }
679
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
681 {
682         struct obd_export *export;
683         export = class_conn2export(conn);
684         if (export) {
685                 struct obd_device *obd = export->exp_obd;
686                 class_export_put(export);
687                 return obd;
688         }
689         return NULL;
690 }
691
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
693 {
694         struct obd_device *obd = exp->exp_obd;
695         if (obd == NULL)
696                 return NULL;
697         return obd->u.cli.cl_import;
698 }
699
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
701 {
702         struct obd_device *obd = class_conn2obd(conn);
703         if (obd == NULL)
704                 return NULL;
705         return obd->u.cli.cl_import;
706 }
707
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
710 {
711         struct obd_device *obd = exp->exp_obd;
712         ENTRY;
713
714         LASSERT (atomic_read(&exp->exp_refcount) == 0);
715
716         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717                exp->exp_client_uuid.uuid, obd->obd_name);
718
719         LASSERT(obd != NULL);
720
721         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722         if (exp->exp_connection)
723                 ptlrpc_put_connection_superhack(exp->exp_connection);
724
725         LASSERT(list_empty(&exp->exp_outstanding_replies));
726         LASSERT(list_empty(&exp->exp_uncommitted_replies));
727         LASSERT(list_empty(&exp->exp_req_replay_queue));
728         LASSERT(list_empty(&exp->exp_queued_rpc));
729         obd_destroy_export(exp);
730         class_decref(obd, "export", exp);
731
732         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
733         EXIT;
734 }
735
736 static void export_handle_addref(void *export)
737 {
738         class_export_get(export);
739 }
740
741 struct obd_export *class_export_get(struct obd_export *exp)
742 {
743         atomic_inc(&exp->exp_refcount);
744         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745                atomic_read(&exp->exp_refcount));
746         return exp;
747 }
748 EXPORT_SYMBOL(class_export_get);
749
750 void class_export_put(struct obd_export *exp)
751 {
752         LASSERT(exp != NULL);
753         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
754                atomic_read(&exp->exp_refcount) - 1);
755         LASSERT(atomic_read(&exp->exp_refcount) > 0);
756         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
757
758         if (atomic_dec_and_test(&exp->exp_refcount)) {
759                 LASSERT(!list_empty(&exp->exp_obd_chain));
760                 CDEBUG(D_IOCTL, "final put %p/%s\n",
761                        exp, exp->exp_client_uuid.uuid);
762                 obd_zombie_export_add(exp);
763         }
764 }
765 EXPORT_SYMBOL(class_export_put);
766
767 /* Creates a new export, adds it to the hash table, and returns a
768  * pointer to it. The refcount is 2: one for the hash reference, and
769  * one for the pointer returned by this function. */
770 struct obd_export *class_new_export(struct obd_device *obd,
771                                     struct obd_uuid *cluuid)
772 {
773         struct obd_export *export;
774         int rc = 0;
775
776         OBD_ALLOC_PTR(export);
777         if (!export)
778                 return ERR_PTR(-ENOMEM);
779
780         export->exp_conn_cnt = 0;
781         export->exp_lock_hash = NULL;
782         atomic_set(&export->exp_refcount, 2);
783         atomic_set(&export->exp_rpc_count, 0);
784         atomic_set(&export->exp_cb_count, 0);
785         export->exp_obd = obd;
786         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
787         spin_lock_init(&export->exp_uncommitted_replies_lock);
788         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
789         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
790         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
791         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
792         class_handle_hash(&export->exp_handle, export_handle_addref);
793         export->exp_last_request_time = cfs_time_current_sec();
794         spin_lock_init(&export->exp_lock);
795         INIT_HLIST_NODE(&export->exp_uuid_hash);
796         INIT_HLIST_NODE(&export->exp_nid_hash);
797
798         export->exp_sp_peer = LUSTRE_SP_ANY;
799         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
800         export->exp_client_uuid = *cluuid;
801         obd_init_export(export);
802
803         spin_lock(&obd->obd_dev_lock);
804         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
805                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
806                                             &export->exp_uuid_hash);
807                 if (rc != 0) {
808                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
809                                       obd->obd_name, cluuid->uuid, rc);
810                         spin_unlock(&obd->obd_dev_lock);
811                         class_handle_unhash(&export->exp_handle);
812                         OBD_FREE_PTR(export);
813                         return ERR_PTR(-EALREADY);
814                 }
815         }
816
817         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
818         class_incref(obd, "export", export);
819         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
820         list_add_tail(&export->exp_obd_chain_timed,
821                       &export->exp_obd->obd_exports_timed);
822         export->exp_obd->obd_num_exports++;
823         spin_unlock(&obd->obd_dev_lock);
824
825         return export;
826 }
827 EXPORT_SYMBOL(class_new_export);
828
829 void class_unlink_export(struct obd_export *exp)
830 {
831         class_handle_unhash(&exp->exp_handle);
832
833         spin_lock(&exp->exp_obd->obd_dev_lock);
834         /* delete an uuid-export hashitem from hashtables */
835         if (!hlist_unhashed(&exp->exp_uuid_hash))
836                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
837                                 &exp->exp_client_uuid,
838                                 &exp->exp_uuid_hash);
839
840         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
841         list_del_init(&exp->exp_obd_chain_timed);
842         exp->exp_obd->obd_num_exports--;
843         spin_unlock(&exp->exp_obd->obd_dev_lock);
844
845         /* Keep these counter valid always */
846         spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
847         if (exp->exp_delayed)
848                 exp->exp_obd->obd_delayed_clients--;
849         else if (exp->exp_in_recovery)
850                 exp->exp_obd->obd_recoverable_clients--;
851         else if (exp->exp_obd->obd_recovering)
852                 exp->exp_obd->obd_max_recoverable_clients--;
853         spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
854         class_export_put(exp);
855 }
856 EXPORT_SYMBOL(class_unlink_export);
857
858 /* Import management functions */
859 void class_import_destroy(struct obd_import *imp)
860 {
861         ENTRY;
862
863         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
864                 imp->imp_obd->obd_name);
865
866         LASSERT(atomic_read(&imp->imp_refcount) == 0);
867
868         ptlrpc_put_connection_superhack(imp->imp_connection);
869
870         while (!list_empty(&imp->imp_conn_list)) {
871                 struct obd_import_conn *imp_conn;
872
873                 imp_conn = list_entry(imp->imp_conn_list.next,
874                                       struct obd_import_conn, oic_item);
875                 list_del_init(&imp_conn->oic_item);
876                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
877                 OBD_FREE(imp_conn, sizeof(*imp_conn));
878         }
879
880         LASSERT(imp->imp_sec == NULL);
881         class_decref(imp->imp_obd, "import", imp);
882         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
883         EXIT;
884 }
885
886 static void import_handle_addref(void *import)
887 {
888         class_import_get(import);
889 }
890
891 struct obd_import *class_import_get(struct obd_import *import)
892 {
893         LASSERT(atomic_read(&import->imp_refcount) >= 0);
894         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
895         atomic_inc(&import->imp_refcount);
896         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
897                atomic_read(&import->imp_refcount), 
898                import->imp_obd->obd_name);
899         return import;
900 }
901 EXPORT_SYMBOL(class_import_get);
902
903 void class_import_put(struct obd_import *imp)
904 {
905         ENTRY;
906
907         LASSERT(atomic_read(&imp->imp_refcount) > 0);
908         LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
909         LASSERT(list_empty(&imp->imp_zombie_chain));
910
911         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
912                atomic_read(&imp->imp_refcount) - 1, 
913                imp->imp_obd->obd_name);
914
915         if (atomic_dec_and_test(&imp->imp_refcount)) {
916                 CDEBUG(D_INFO, "final put import %p\n", imp);
917                 obd_zombie_import_add(imp);
918         }
919
920         EXIT;
921 }
922 EXPORT_SYMBOL(class_import_put);
923
924 static void init_imp_at(struct imp_at *at) {
925         int i;
926         at_init(&at->iat_net_latency, 0, 0);
927         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
928                 /* max service estimates are tracked on the server side, so
929                    don't use the AT history here, just use the last reported
930                    val. (But keep hist for proc histogram, worst_ever) */
931                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
932                         AT_FLG_NOHIST);
933         }
934 }
935
936 struct obd_import *class_new_import(struct obd_device *obd)
937 {
938         struct obd_import *imp;
939
940         OBD_ALLOC(imp, sizeof(*imp));
941         if (imp == NULL)
942                 return NULL;
943
944         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
945         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
946         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
947         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
948         spin_lock_init(&imp->imp_lock);
949         imp->imp_last_success_conn = 0;
950         imp->imp_state = LUSTRE_IMP_NEW;
951         imp->imp_obd = class_incref(obd, "import", imp);
952         sema_init(&imp->imp_sec_mutex, 1);
953         cfs_waitq_init(&imp->imp_recovery_waitq);
954
955         atomic_set(&imp->imp_refcount, 2);
956         atomic_set(&imp->imp_unregistering, 0);
957         atomic_set(&imp->imp_inflight, 0);
958         atomic_set(&imp->imp_replay_inflight, 0);
959         atomic_set(&imp->imp_inval_count, 0);
960         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
961         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
962         class_handle_hash(&imp->imp_handle, import_handle_addref);
963         init_imp_at(&imp->imp_at);
964
965         /* the default magic is V2, will be used in connect RPC, and
966          * then adjusted according to the flags in request/reply. */
967         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
968
969         return imp;
970 }
971 EXPORT_SYMBOL(class_new_import);
972
973 void class_destroy_import(struct obd_import *import)
974 {
975         LASSERT(import != NULL);
976         LASSERT(import != LP_POISON);
977
978         class_handle_unhash(&import->imp_handle);
979
980         spin_lock(&import->imp_lock);
981         import->imp_generation++;
982         spin_unlock(&import->imp_lock);
983         class_import_put(import);
984 }
985 EXPORT_SYMBOL(class_destroy_import);
986
987 /* A connection defines an export context in which preallocation can
988    be managed. This releases the export pointer reference, and returns
989    the export handle, so the export refcount is 1 when this function
990    returns. */
991 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
992                   struct obd_uuid *cluuid)
993 {
994         struct obd_export *export;
995         LASSERT(conn != NULL);
996         LASSERT(obd != NULL);
997         LASSERT(cluuid != NULL);
998         ENTRY;
999
1000         export = class_new_export(obd, cluuid);
1001         if (IS_ERR(export))
1002                 RETURN(PTR_ERR(export));
1003
1004         conn->cookie = export->exp_handle.h_cookie;
1005         class_export_put(export);
1006
1007         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1008                cluuid->uuid, conn->cookie);
1009         RETURN(0);
1010 }
1011 EXPORT_SYMBOL(class_connect);
1012
1013 /* if export is involved in recovery then clean up related things */
1014 void class_export_recovery_cleanup(struct obd_export *exp)
1015 {
1016         struct obd_device *obd = exp->exp_obd;
1017
1018         spin_lock_bh(&obd->obd_processing_task_lock);
1019         if (obd->obd_recovering && exp->exp_in_recovery) {
1020                 spin_lock(&exp->exp_lock);
1021                 exp->exp_in_recovery = 0;
1022                 spin_unlock(&exp->exp_lock);
1023                 obd->obd_connected_clients--;
1024                 /* each connected client is counted as recoverable */
1025                 obd->obd_recoverable_clients--;
1026                 if (exp->exp_req_replay_needed) {
1027                         spin_lock(&exp->exp_lock);
1028                         exp->exp_req_replay_needed = 0;
1029                         spin_unlock(&exp->exp_lock);
1030                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
1031                         atomic_dec(&obd->obd_req_replay_clients);
1032                 }
1033                 if (exp->exp_lock_replay_needed) {
1034                         spin_lock(&exp->exp_lock);
1035                         exp->exp_lock_replay_needed = 0;
1036                         spin_unlock(&exp->exp_lock);
1037                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1038                         atomic_dec(&obd->obd_lock_replay_clients);
1039                 }
1040         }
1041         spin_unlock_bh(&obd->obd_processing_task_lock);
1042 }
1043
1044 /* This function removes 1-3 references from the export:
1045  * 1 - for export pointer passed
1046  * and if disconnect really need
1047  * 2 - removing from hash
1048  * 3 - in client_unlink_export
1049  * The export pointer passed to this function can destroyed */
1050 int class_disconnect(struct obd_export *export)
1051 {
1052         int already_disconnected;
1053         ENTRY;
1054
1055         if (export == NULL) {
1056                 fixme();
1057                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1058                 RETURN(-EINVAL);
1059         }
1060
1061         spin_lock(&export->exp_lock);
1062         already_disconnected = export->exp_disconnected;
1063         export->exp_disconnected = 1;
1064         spin_unlock(&export->exp_lock);
1065
1066         /* class_cleanup(), abort_recovery(), and class_fail_export()
1067          * all end up in here, and if any of them race we shouldn't
1068          * call extra class_export_puts(). */
1069         if (already_disconnected) {
1070                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1071                 GOTO(no_disconn, already_disconnected);
1072         }
1073
1074         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1075                export->exp_handle.h_cookie);
1076
1077         if (!hlist_unhashed(&export->exp_nid_hash))
1078                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1079                                 &export->exp_connection->c_peer.nid,
1080                                 &export->exp_nid_hash);
1081
1082         class_export_recovery_cleanup(export);
1083         class_unlink_export(export);
1084 no_disconn:
1085         class_export_put(export);
1086         RETURN(0);
1087 }
1088
1089 static void class_disconnect_export_list(struct list_head *list,
1090                                          enum obd_option flags)
1091 {
1092         int rc;
1093         struct obd_export *exp;
1094         ENTRY;
1095
1096         /* It's possible that an export may disconnect itself, but
1097          * nothing else will be added to this list. */
1098         while (!list_empty(list)) {
1099                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1100                 /* need for safe call CDEBUG after obd_disconnect */
1101                 class_export_get(exp);
1102
1103                 spin_lock(&exp->exp_lock);
1104                 exp->exp_flags = flags;
1105                 spin_unlock(&exp->exp_lock);
1106
1107                 if (obd_uuid_equals(&exp->exp_client_uuid,
1108                                     &exp->exp_obd->obd_uuid)) {
1109                         CDEBUG(D_HA,
1110                                "exp %p export uuid == obd uuid, don't discon\n",
1111                                exp);
1112                         /* Need to delete this now so we don't end up pointing
1113                          * to work_list later when this export is cleaned up. */
1114                         list_del_init(&exp->exp_obd_chain);
1115                         class_export_put(exp);
1116                         continue;
1117                 }
1118
1119                 class_export_get(exp);
1120                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1121                        "last request at "CFS_TIME_T"\n",
1122                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1123                        exp, exp->exp_last_request_time);
1124                 /* release one export reference anyway */
1125                 rc = obd_disconnect(exp);
1126
1127                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1128                        obd_export_nid2str(exp), exp, rc);
1129                 class_export_put(exp);
1130         }
1131         EXIT;
1132 }
1133
1134 void class_disconnect_exports(struct obd_device *obd)
1135 {
1136         struct list_head work_list;
1137         ENTRY;
1138
1139         /* Move all of the exports from obd_exports to a work list, en masse. */
1140         CFS_INIT_LIST_HEAD(&work_list);
1141         spin_lock(&obd->obd_dev_lock);
1142         list_splice_init(&obd->obd_exports, &work_list);
1143         list_splice_init(&obd->obd_delayed_exports, &work_list);
1144         spin_unlock(&obd->obd_dev_lock);
1145
1146         if (!list_empty(&work_list)) {
1147                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1148                        "disconnecting them\n", obd->obd_minor, obd);
1149                 class_disconnect_export_list(&work_list,
1150                                              exp_flags_from_obd(obd));
1151         } else
1152                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1153                        obd->obd_minor, obd);
1154         EXIT;
1155 }
1156 EXPORT_SYMBOL(class_disconnect_exports);
1157
1158 /* Remove exports that have not completed recovery.
1159  */
1160 void class_disconnect_stale_exports(struct obd_device *obd,
1161                                     int (*test_export)(struct obd_export *),
1162                                     enum obd_option flags)
1163 {
1164         struct list_head work_list;
1165         struct list_head *pos, *n;
1166         struct obd_export *exp;
1167         ENTRY;
1168
1169         CFS_INIT_LIST_HEAD(&work_list);
1170         spin_lock(&obd->obd_dev_lock);
1171         obd->obd_stale_clients = 0;
1172         list_for_each_safe(pos, n, &obd->obd_exports) {
1173                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1174                 if (test_export(exp))
1175                         continue;
1176
1177                 list_move(&exp->exp_obd_chain, &work_list);
1178                 /* don't count self-export as client */
1179                 if (obd_uuid_equals(&exp->exp_client_uuid,
1180                                      &exp->exp_obd->obd_uuid))
1181                         continue;
1182
1183                 obd->obd_stale_clients++;
1184                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1185                        obd->obd_name, exp->exp_client_uuid.uuid,
1186                        exp->exp_connection == NULL ? "<unknown>" :
1187                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1188         }
1189         spin_unlock(&obd->obd_dev_lock);
1190
1191         CDEBUG(D_HA, "%s: disconnecting %d stale clients\n", obd->obd_name,
1192                obd->obd_stale_clients);
1193
1194         class_disconnect_export_list(&work_list, flags);
1195         EXIT;
1196 }
1197 EXPORT_SYMBOL(class_disconnect_stale_exports);
1198
1199 void class_fail_export(struct obd_export *exp)
1200 {
1201         int rc, already_failed;
1202
1203         spin_lock(&exp->exp_lock);
1204         already_failed = exp->exp_failed;
1205         exp->exp_failed = 1;
1206         spin_unlock(&exp->exp_lock);
1207
1208         if (already_failed) {
1209                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1210                        exp, exp->exp_client_uuid.uuid);
1211                 return;
1212         }
1213
1214         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1215                exp, exp->exp_client_uuid.uuid);
1216
1217         if (obd_dump_on_timeout)
1218                 libcfs_debug_dumplog();
1219
1220         /* Most callers into obd_disconnect are removing their own reference
1221          * (request, for example) in addition to the one from the hash table.
1222          * We don't have such a reference here, so make one. */
1223         class_export_get(exp);
1224         rc = obd_disconnect(exp);
1225         if (rc)
1226                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1227         else
1228                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1229                        exp, exp->exp_client_uuid.uuid);
1230 }
1231 EXPORT_SYMBOL(class_fail_export);
1232
1233 char *obd_export_nid2str(struct obd_export *exp)
1234 {
1235         if (exp->exp_connection != NULL)
1236                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1237
1238         return "(no nid)";
1239 }
1240 EXPORT_SYMBOL(obd_export_nid2str);
1241
1242 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1243 {
1244         struct obd_export *doomed_exp = NULL;
1245         int exports_evicted = 0;
1246
1247         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1248
1249         do {
1250                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1251                 if (doomed_exp == NULL)
1252                         break;
1253
1254                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1255                          "nid %s found, wanted nid %s, requested nid %s\n",
1256                          obd_export_nid2str(doomed_exp),
1257                          libcfs_nid2str(nid_key), nid);
1258                 LASSERTF(doomed_exp != obd->obd_self_export,
1259                          "self-export is hashed by NID?\n");
1260                 exports_evicted++;
1261                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1262                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1263                        exports_evicted);
1264                 class_fail_export(doomed_exp);
1265                 class_export_put(doomed_exp);
1266         } while (1);
1267
1268         if (!exports_evicted)
1269                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1270                        obd->obd_name, nid);
1271         return exports_evicted;
1272 }
1273 EXPORT_SYMBOL(obd_export_evict_by_nid);
1274
1275 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1276 {
1277         struct obd_export *doomed_exp = NULL;
1278         struct obd_uuid doomed_uuid;
1279         int exports_evicted = 0;
1280
1281         obd_str2uuid(&doomed_uuid, uuid);
1282         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1283                 CERROR("%s: can't evict myself\n", obd->obd_name);
1284                 return exports_evicted;
1285         }
1286
1287         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1288
1289         if (doomed_exp == NULL) {
1290                 CERROR("%s: can't disconnect %s: no exports found\n",
1291                        obd->obd_name, uuid);
1292         } else {
1293                 CWARN("%s: evicting %s at adminstrative request\n",
1294                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1295                 class_fail_export(doomed_exp);
1296                 class_export_put(doomed_exp);
1297                 exports_evicted++;
1298         }
1299
1300         return exports_evicted;
1301 }
1302 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1303
1304 static void print_export_data(struct obd_export *exp, const char *status)
1305 {
1306         struct ptlrpc_reply_state *rs;
1307         struct ptlrpc_reply_state *first_reply = NULL;
1308         int nreplies = 0;
1309
1310         spin_lock(&exp->exp_lock);
1311         list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1312                 if (nreplies == 0)
1313                         first_reply = rs;
1314                 nreplies++;
1315         }
1316         spin_unlock(&exp->exp_lock);
1317
1318         CDEBUG(D_HA, "%s: %s %p %s %s %d %d %d: %p %s %d %d %d %d "LPU64"\n",
1319                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1320                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1321                exp->exp_failed, nreplies, first_reply,
1322                nreplies > 3 ? "..." : "",
1323                atomic_read(&exp->exp_rpc_count),
1324                atomic_read(&exp->exp_cb_count),
1325                exp->exp_disconnected, exp->exp_delayed,
1326                exp->exp_last_committed);
1327 }
1328
1329 void dump_exports(struct obd_device *obd)
1330 {
1331         struct obd_export *exp;
1332
1333         spin_lock(&obd->obd_dev_lock);
1334         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1335                 print_export_data(exp, "ACTIVE");
1336         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1337                 print_export_data(exp, "UNLINKED");
1338         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1339                 print_export_data(exp, "DELAYED");
1340         spin_unlock(&obd->obd_dev_lock);
1341         spin_lock(&obd_zombie_impexp_lock);
1342         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1343                 print_export_data(exp, "ZOMBIE");
1344         spin_unlock(&obd_zombie_impexp_lock);
1345 }
1346 EXPORT_SYMBOL(dump_exports);
1347
1348 void obd_exports_barrier(struct obd_device *obd)
1349 {
1350         int waited = 2;
1351         LASSERT(list_empty(&obd->obd_exports));
1352         spin_lock(&obd->obd_dev_lock);
1353         while (!list_empty(&obd->obd_unlinked_exports)) {
1354                 spin_unlock(&obd->obd_dev_lock);
1355                 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1356                 if (waited > 5 && IS_PO2(waited)) {
1357                         LCONSOLE_WARN("Waiting for obd_unlinked_exports "
1358                                       "more than %d seconds. "
1359                                       "The obd refcount = %d. Is it stuck?\n",
1360                                       waited, atomic_read(&obd->obd_refcount));
1361                         dump_exports(obd);
1362                 }
1363                 waited *= 2;
1364                 spin_lock(&obd->obd_dev_lock);
1365         }
1366         spin_unlock(&obd->obd_dev_lock);
1367 }
1368 EXPORT_SYMBOL(obd_exports_barrier);
1369
1370 /**
1371  * kill zombie imports and exports
1372  */
1373 void obd_zombie_impexp_cull(void)
1374 {
1375         struct obd_import *import;
1376         struct obd_export *export;
1377         ENTRY;
1378
1379         do {
1380                 spin_lock(&obd_zombie_impexp_lock);
1381
1382                 import = NULL;
1383                 if (!list_empty(&obd_zombie_imports)) {
1384                         import = list_entry(obd_zombie_imports.next,
1385                                             struct obd_import,
1386                                             imp_zombie_chain);
1387                         list_del_init(&import->imp_zombie_chain);
1388                 }
1389
1390                 export = NULL;
1391                 if (!list_empty(&obd_zombie_exports)) {
1392                         export = list_entry(obd_zombie_exports.next,
1393                                             struct obd_export,
1394                                             exp_obd_chain);
1395                         list_del_init(&export->exp_obd_chain);
1396                 }
1397
1398                 spin_unlock(&obd_zombie_impexp_lock);
1399
1400                 if (import != NULL)
1401                         class_import_destroy(import);
1402
1403                 if (export != NULL)
1404                         class_export_destroy(export);
1405
1406         } while (import != NULL || export != NULL);
1407         EXIT;
1408 }
1409
1410 static struct completion        obd_zombie_start;
1411 static struct completion        obd_zombie_stop;
1412 static unsigned long            obd_zombie_flags;
1413 static cfs_waitq_t              obd_zombie_waitq;
1414
1415 enum {
1416         OBD_ZOMBIE_STOP   = 1 << 1
1417 };
1418
1419 /**
1420  * check for work for kill zombie import/export thread.
1421  */
1422 static int obd_zombie_impexp_check(void *arg)
1423 {
1424         int rc;
1425
1426         spin_lock(&obd_zombie_impexp_lock);
1427         rc = list_empty(&obd_zombie_imports) &&
1428              list_empty(&obd_zombie_exports) &&
1429              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1430
1431         spin_unlock(&obd_zombie_impexp_lock);
1432
1433         RETURN(rc);
1434 }
1435
1436 /**
1437  * Add export to the obd_zombe thread and notify it.
1438  */
1439 static void obd_zombie_export_add(struct obd_export *exp) {
1440         spin_lock(&exp->exp_obd->obd_dev_lock);
1441         LASSERT(!list_empty(&exp->exp_obd_chain));
1442         list_del_init(&exp->exp_obd_chain);
1443         spin_unlock(&exp->exp_obd->obd_dev_lock);
1444         spin_lock(&obd_zombie_impexp_lock);
1445         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1446         spin_unlock(&obd_zombie_impexp_lock);
1447
1448         if (obd_zombie_impexp_notify != NULL)
1449                 obd_zombie_impexp_notify();
1450 }
1451
1452 /**
1453  * Add import to the obd_zombe thread and notify it.
1454  */
1455 static void obd_zombie_import_add(struct obd_import *imp) {
1456         LASSERT(imp->imp_sec == NULL);
1457         spin_lock(&obd_zombie_impexp_lock);
1458         LASSERT(list_empty(&imp->imp_zombie_chain));
1459         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1460         spin_unlock(&obd_zombie_impexp_lock);
1461
1462         if (obd_zombie_impexp_notify != NULL)
1463                 obd_zombie_impexp_notify();
1464 }
1465
1466 /**
1467  * notify import/export destroy thread about new zombie.
1468  */
1469 static void obd_zombie_impexp_notify(void)
1470 {
1471         cfs_waitq_signal(&obd_zombie_waitq);
1472 }
1473
1474 /**
1475  * check whether obd_zombie is idle
1476  */
1477 static int obd_zombie_is_idle(void)
1478 {
1479         int rc;
1480
1481         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1482         spin_lock(&obd_zombie_impexp_lock);
1483         rc = list_empty(&obd_zombie_imports) &&
1484              list_empty(&obd_zombie_exports);
1485         spin_unlock(&obd_zombie_impexp_lock);
1486         return rc;
1487 }
1488
1489 /**
1490  * wait when obd_zombie import/export queues become empty
1491  */
1492 void obd_zombie_barrier(void)
1493 {
1494         struct l_wait_info lwi = { 0 };
1495         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1496 }
1497 EXPORT_SYMBOL(obd_zombie_barrier);
1498
1499 #ifdef __KERNEL__
1500
1501 /**
1502  * destroy zombie export/import thread.
1503  */
1504 static int obd_zombie_impexp_thread(void *unused)
1505 {
1506         int rc;
1507
1508         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1509                 complete(&obd_zombie_start);
1510                 RETURN(rc);
1511         }
1512
1513         complete(&obd_zombie_start);
1514
1515         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1516                 struct l_wait_info lwi = { 0 };
1517
1518                 l_wait_event(obd_zombie_waitq, 
1519                              !obd_zombie_impexp_check(NULL), &lwi);
1520                 obd_zombie_impexp_cull();
1521
1522                 /* 
1523                  * Notify obd_zombie_barrier callers that queues
1524                  * may be empty.
1525                  */
1526                 cfs_waitq_signal(&obd_zombie_waitq);
1527         }
1528
1529         complete(&obd_zombie_stop);
1530
1531         RETURN(0);
1532 }
1533
1534 #else /* ! KERNEL */
1535
1536 static atomic_t zombie_recur = ATOMIC_INIT(0);
1537 static void *obd_zombie_impexp_work_cb;
1538 static void *obd_zombie_impexp_idle_cb;
1539
1540 int obd_zombie_impexp_kill(void *arg)
1541 {
1542         int rc = 0;
1543
1544         if (atomic_inc_return(&zombie_recur) == 1) {
1545                 obd_zombie_impexp_cull();
1546                 rc = 1;
1547         }
1548         atomic_dec(&zombie_recur);
1549         return rc;
1550 }
1551
1552 #endif
1553
1554 /**
1555  * start destroy zombie import/export thread
1556  */
1557 int obd_zombie_impexp_init(void)
1558 {
1559         int rc;
1560
1561         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1562         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1563         spin_lock_init(&obd_zombie_impexp_lock);
1564         init_completion(&obd_zombie_start);
1565         init_completion(&obd_zombie_stop);
1566         cfs_waitq_init(&obd_zombie_waitq);
1567
1568 #ifdef __KERNEL__
1569         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1570         if (rc < 0)
1571                 RETURN(rc);
1572
1573         wait_for_completion(&obd_zombie_start);
1574 #else
1575
1576         obd_zombie_impexp_work_cb =
1577                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1578                                                  &obd_zombie_impexp_kill, NULL);
1579
1580         obd_zombie_impexp_idle_cb =
1581                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1582                                                  &obd_zombie_impexp_check, NULL);
1583         rc = 0;
1584 #endif
1585         RETURN(rc);
1586 }
1587 /**
1588  * stop destroy zombie import/export thread
1589  */
1590 void obd_zombie_impexp_stop(void)
1591 {
1592         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1593         obd_zombie_impexp_notify();
1594 #ifdef __KERNEL__
1595         wait_for_completion(&obd_zombie_stop);
1596 #else
1597         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1598         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1599 #endif
1600 }