Whamcloud - gitweb
Add lu_ref tracking to obd_device.
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150                         struct lprocfs_vars *vars, const char *name,
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173
174         if (type->typ_dt_ops == NULL ||
175             type->typ_md_ops == NULL ||
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = lu_device_type_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 lu_device_type_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 /**
258  * Create a new obd device.
259  *
260  * Find an empty slot in ::obd_devs[], create a new obd device in it.
261  *
262  * \param typename [in] obd device type string.
263  * \param name     [in] obd device name.
264  *
265  * \retval NULL if create fails, otherwise return the obd device
266  *         pointer created.
267  */
268 struct obd_device *class_newdev(const char *type_name, const char *name)
269 {
270         struct obd_device *result = NULL;
271         struct obd_device *newdev;
272         struct obd_type *type = NULL;
273         int i;
274         int new_obd_minor = 0;
275
276         if (strlen(name) >= MAX_OBD_NAME) {
277                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278                 RETURN(ERR_PTR(-EINVAL));
279         }
280
281         type = class_get_type(type_name);
282         if (type == NULL){
283                 CERROR("OBD: unknown type: %s\n", type_name);
284                 RETURN(ERR_PTR(-ENODEV));
285         }
286
287         newdev = obd_device_alloc();
288         if (newdev == NULL) {
289                 class_put_type(type);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
293
294         spin_lock(&obd_dev_lock);
295         for (i = 0; i < class_devno_max(); i++) {
296                 struct obd_device *obd = class_num2obd(i);
297                 if (obd && obd->obd_name &&
298                     (strcmp(name, obd->obd_name) == 0)) {
299                         CERROR("Device %s already exists, won't add\n", name);
300                         if (result) {
301                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302                                          "%p obd_magic %08x != %08x\n", result,
303                                          result->obd_magic, OBD_DEVICE_MAGIC);
304                                 LASSERTF(result->obd_minor == new_obd_minor,
305                                          "%p obd_minor %d != %d\n", result,
306                                          result->obd_minor, new_obd_minor);
307
308                                 obd_devs[result->obd_minor] = NULL;
309                                 result->obd_name[0]='\0';
310                          }
311                         result = ERR_PTR(-EEXIST);
312                         break;
313                 }
314                 if (!result && !obd) {
315                         result = newdev;
316                         result->obd_minor = i;
317                         new_obd_minor = i;
318                         result->obd_type = type;
319                         strncpy(result->obd_name, name,
320                                 sizeof(result->obd_name) - 1);
321                         obd_devs[i] = result;
322                 }
323         }
324         spin_unlock(&obd_dev_lock);
325
326         if (result == NULL && i >= class_devno_max()) {
327                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328                        class_devno_max());
329                 result = ERR_PTR(-EOVERFLOW);
330         }
331
332         if (IS_ERR(result)) {
333                 obd_device_free(newdev);
334                 class_put_type(type);
335         } else {
336                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337                        result->obd_name, result);
338         }
339         return result;
340 }
341
342 void class_release_dev(struct obd_device *obd)
343 {
344         struct obd_type *obd_type = obd->obd_type;
345
346         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350         LASSERT(obd_type != NULL);
351
352         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353                obd->obd_name,obd->obd_type->typ_name);
354
355         spin_lock(&obd_dev_lock);
356         obd_devs[obd->obd_minor] = NULL;
357         spin_unlock(&obd_dev_lock);
358         obd_device_free(obd);
359
360         class_put_type(obd_type);
361 }
362
363 int class_name2dev(const char *name)
364 {
365         int i;
366
367         if (!name)
368                 return -1;
369
370         spin_lock(&obd_dev_lock);
371         for (i = 0; i < class_devno_max(); i++) {
372                 struct obd_device *obd = class_num2obd(i);
373                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374                         /* Make sure we finished attaching before we give
375                            out any references */
376                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377                         if (obd->obd_attached) {
378                                 spin_unlock(&obd_dev_lock);
379                                 return i;
380                         }
381                         break;
382                 }
383         }
384         spin_unlock(&obd_dev_lock);
385
386         return -1;
387 }
388
389 struct obd_device *class_name2obd(const char *name)
390 {
391         int dev = class_name2dev(name);
392
393         if (dev < 0 || dev > class_devno_max())
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 int class_uuid2dev(struct obd_uuid *uuid)
399 {
400         int i;
401
402         spin_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407                         spin_unlock(&obd_dev_lock);
408                         return i;
409                 }
410         }
411         spin_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
417 {
418         int dev = class_uuid2dev(uuid);
419         if (dev < 0)
420                 return NULL;
421         return class_num2obd(dev);
422 }
423
424 /**
425  * Get obd device from ::obd_devs[]
426  *
427  * \param num [in] array index
428  *
429  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430  *         otherwise return the obd device there.
431  */
432 struct obd_device *class_num2obd(int num)
433 {
434         struct obd_device *obd = NULL;
435
436         if (num < class_devno_max()) {
437                 obd = obd_devs[num];
438                 if (obd == NULL)
439                         return NULL;
440
441                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442                          "%p obd_magic %08x != %08x\n",
443                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444                 LASSERTF(obd->obd_minor == num,
445                          "%p obd_minor %0d != %0d\n",
446                          obd, obd->obd_minor, num);
447         }
448
449         return obd;
450 }
451
452 void class_obd_list(void)
453 {
454         char *status;
455         int i;
456
457         spin_lock(&obd_dev_lock);
458         for (i = 0; i < class_devno_max(); i++) {
459                 struct obd_device *obd = class_num2obd(i);
460                 if (obd == NULL)
461                         continue;
462                 if (obd->obd_stopping)
463                         status = "ST";
464                 else if (obd->obd_set_up)
465                         status = "UP";
466                 else if (obd->obd_attached)
467                         status = "AT";
468                 else
469                         status = "--";
470                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471                          i, status, obd->obd_type->typ_name,
472                          obd->obd_name, obd->obd_uuid.uuid,
473                          atomic_read(&obd->obd_refcount));
474         }
475         spin_unlock(&obd_dev_lock);
476         return;
477 }
478
479 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
480    specified, then only the client with that uuid is returned,
481    otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483                                           const char * typ_name,
484                                           struct obd_uuid *grp_uuid)
485 {
486         int i;
487
488         spin_lock(&obd_dev_lock);
489         for (i = 0; i < class_devno_max(); i++) {
490                 struct obd_device *obd = class_num2obd(i);
491                 if (obd == NULL)
492                         continue;
493                 if ((strncmp(obd->obd_type->typ_name, typ_name,
494                              strlen(typ_name)) == 0)) {
495                         if (obd_uuid_equals(tgt_uuid,
496                                             &obd->u.cli.cl_target_uuid) &&
497                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
498                                                          &obd->obd_uuid) : 1)) {
499                                 spin_unlock(&obd_dev_lock);
500                                 return obd;
501                         }
502                 }
503         }
504         spin_unlock(&obd_dev_lock);
505
506         return NULL;
507 }
508
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510                                             struct obd_uuid *grp_uuid)
511 {
512         struct obd_device *obd;
513
514         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
515         if (!obd)
516                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
517                                             grp_uuid);
518         return obd;
519 }
520
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522    searching at *next, and if a device is found, the next index to look
523    at is saved in *next. If next is NULL, then the first matching device
524    will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
526 {
527         int i;
528
529         if (next == NULL)
530                 i = 0;
531         else if (*next >= 0 && *next < class_devno_max())
532                 i = *next;
533         else
534                 return NULL;
535
536         spin_lock(&obd_dev_lock);
537         for (; i < class_devno_max(); i++) {
538                 struct obd_device *obd = class_num2obd(i);
539                 if (obd == NULL)
540                         continue;
541                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
542                         if (next != NULL)
543                                 *next = i+1;
544                         spin_unlock(&obd_dev_lock);
545                         return obd;
546                 }
547         }
548         spin_unlock(&obd_dev_lock);
549
550         return NULL;
551 }
552
553
554 void obd_cleanup_caches(void)
555 {
556         int rc;
557
558         ENTRY;
559         if (obd_device_cachep) {
560                 rc = cfs_mem_cache_destroy(obd_device_cachep);
561                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562                 obd_device_cachep = NULL;
563         }
564         if (obdo_cachep) {
565                 rc = cfs_mem_cache_destroy(obdo_cachep);
566                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
567                 obdo_cachep = NULL;
568         }
569         if (import_cachep) {
570                 rc = cfs_mem_cache_destroy(import_cachep);
571                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572                 import_cachep = NULL;
573         }
574         if (capa_cachep) {
575                 rc = cfs_mem_cache_destroy(capa_cachep);
576                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
577                 capa_cachep = NULL;
578         }
579         EXIT;
580 }
581
582 int obd_init_caches(void)
583 {
584         ENTRY;
585
586         LASSERT(obd_device_cachep == NULL);
587         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588                                                  sizeof(struct obd_device),
589                                                  0, 0);
590         if (!obd_device_cachep)
591                 GOTO(out, -ENOMEM);
592
593         LASSERT(obdo_cachep == NULL);
594         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
595                                            0, 0);
596         if (!obdo_cachep)
597                 GOTO(out, -ENOMEM);
598
599         LASSERT(import_cachep == NULL);
600         import_cachep = cfs_mem_cache_create("ll_import_cache",
601                                              sizeof(struct obd_import),
602                                              0, 0);
603         if (!import_cachep)
604                 GOTO(out, -ENOMEM);
605
606         LASSERT(capa_cachep == NULL);
607         capa_cachep = cfs_mem_cache_create("capa_cache",
608                                            sizeof(struct obd_capa), 0, 0);
609         if (!capa_cachep)
610                 GOTO(out, -ENOMEM);
611
612         RETURN(0);
613  out:
614         obd_cleanup_caches();
615         RETURN(-ENOMEM);
616
617 }
618
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
621 {
622         struct obd_export *export;
623         ENTRY;
624
625         if (!conn) {
626                 CDEBUG(D_CACHE, "looking for null handle\n");
627                 RETURN(NULL);
628         }
629
630         if (conn->cookie == -1) {  /* this means assign a new connection */
631                 CDEBUG(D_CACHE, "want a new connection\n");
632                 RETURN(NULL);
633         }
634
635         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636         export = class_handle2object(conn->cookie);
637         RETURN(export);
638 }
639
640 struct obd_device *class_exp2obd(struct obd_export *exp)
641 {
642         if (exp)
643                 return exp->exp_obd;
644         return NULL;
645 }
646
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
648 {
649         struct obd_export *export;
650         export = class_conn2export(conn);
651         if (export) {
652                 struct obd_device *obd = export->exp_obd;
653                 class_export_put(export);
654                 return obd;
655         }
656         return NULL;
657 }
658
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
660 {
661         struct obd_device *obd = exp->exp_obd;
662         if (obd == NULL)
663                 return NULL;
664         return obd->u.cli.cl_import;
665 }
666
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
668 {
669         struct obd_device *obd = class_conn2obd(conn);
670         if (obd == NULL)
671                 return NULL;
672         return obd->u.cli.cl_import;
673 }
674
675 /* Export management functions */
676 static void export_handle_addref(void *export)
677 {
678         class_export_get(export);
679 }
680
681 struct obd_export *class_export_get(struct obd_export *exp)
682 {
683         atomic_inc(&exp->exp_refcount);
684         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
685                atomic_read(&exp->exp_refcount));
686         return exp;
687 }
688 EXPORT_SYMBOL(class_export_get);
689
690 void class_export_put(struct obd_export *exp)
691 {
692         LASSERT(exp != NULL);
693         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
694                atomic_read(&exp->exp_refcount) - 1);
695         LASSERT(atomic_read(&exp->exp_refcount) > 0);
696         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
697
698         if (atomic_dec_and_test(&exp->exp_refcount)) {
699                 LASSERT (list_empty(&exp->exp_obd_chain));
700
701                 CDEBUG(D_IOCTL, "final put %p/%s\n",
702                        exp, exp->exp_client_uuid.uuid);
703
704                 spin_lock(&obd_zombie_impexp_lock);
705                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
706                 spin_unlock(&obd_zombie_impexp_lock);
707
708                 if (obd_zombie_impexp_notify != NULL)
709                         obd_zombie_impexp_notify();
710         }
711 }
712 EXPORT_SYMBOL(class_export_put);
713
714 static void class_export_destroy(struct obd_export *exp)
715 {
716         struct obd_device *obd = exp->exp_obd;
717         ENTRY;
718
719         LASSERT (atomic_read(&exp->exp_refcount) == 0);
720
721         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
722                exp->exp_client_uuid.uuid, obd->obd_name);
723
724         LASSERT(obd != NULL);
725
726         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
727         if (exp->exp_connection)
728                 ptlrpc_put_connection_superhack(exp->exp_connection);
729
730         LASSERT(list_empty(&exp->exp_outstanding_replies));
731         LASSERT(list_empty(&exp->exp_req_replay_queue));
732         obd_destroy_export(exp);
733         class_decref(obd, "export", exp);
734
735         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
736         EXIT;
737 }
738
739 /* Creates a new export, adds it to the hash table, and returns a
740  * pointer to it. The refcount is 2: one for the hash reference, and
741  * one for the pointer returned by this function. */
742 struct obd_export *class_new_export(struct obd_device *obd,
743                                     struct obd_uuid *cluuid)
744 {
745         struct obd_export *export;
746         int rc = 0;
747
748         OBD_ALLOC_PTR(export);
749         if (!export)
750                 return ERR_PTR(-ENOMEM);
751
752         export->exp_conn_cnt = 0;
753         export->exp_lock_hash = NULL;
754         atomic_set(&export->exp_refcount, 2);
755         atomic_set(&export->exp_rpc_count, 0);
756         export->exp_obd = obd;
757         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
758         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
759         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
760         class_handle_hash(&export->exp_handle, export_handle_addref);
761         export->exp_last_request_time = cfs_time_current_sec();
762         spin_lock_init(&export->exp_lock);
763         INIT_HLIST_NODE(&export->exp_uuid_hash);
764         INIT_HLIST_NODE(&export->exp_nid_hash);
765
766         export->exp_sp_peer = LUSTRE_SP_ANY;
767         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
768         export->exp_client_uuid = *cluuid;
769         obd_init_export(export);
770
771         spin_lock(&obd->obd_dev_lock);
772         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
773                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
774                                             &export->exp_uuid_hash);
775                 if (rc != 0) {
776                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
777                                       obd->obd_name, cluuid->uuid, rc);
778                         spin_unlock(&obd->obd_dev_lock);
779                         class_handle_unhash(&export->exp_handle);
780                         OBD_FREE_PTR(export);
781                         return ERR_PTR(-EALREADY);
782                 }
783         }
784
785         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
786         class_incref(obd, "export", export);
787         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
788         list_add_tail(&export->exp_obd_chain_timed,
789                       &export->exp_obd->obd_exports_timed);
790         export->exp_obd->obd_num_exports++;
791         spin_unlock(&obd->obd_dev_lock);
792
793         return export;
794 }
795 EXPORT_SYMBOL(class_new_export);
796
797 void class_unlink_export(struct obd_export *exp)
798 {
799         class_handle_unhash(&exp->exp_handle);
800
801         spin_lock(&exp->exp_obd->obd_dev_lock);
802         /* delete an uuid-export hashitem from hashtables */
803         if (!hlist_unhashed(&exp->exp_uuid_hash))
804                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
805                                 &exp->exp_client_uuid,
806                                 &exp->exp_uuid_hash);
807
808         list_del_init(&exp->exp_obd_chain);
809         list_del_init(&exp->exp_obd_chain_timed);
810         exp->exp_obd->obd_num_exports--;
811         spin_unlock(&exp->exp_obd->obd_dev_lock);
812
813         class_export_put(exp);
814 }
815 EXPORT_SYMBOL(class_unlink_export);
816
817 /* Import management functions */
818 static void import_handle_addref(void *import)
819 {
820         class_import_get(import);
821 }
822
823 struct obd_import *class_import_get(struct obd_import *import)
824 {
825         LASSERT(atomic_read(&import->imp_refcount) >= 0);
826         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
827         atomic_inc(&import->imp_refcount);
828         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
829                atomic_read(&import->imp_refcount));
830         return import;
831 }
832 EXPORT_SYMBOL(class_import_get);
833
834 void class_import_put(struct obd_import *import)
835 {
836         ENTRY;
837
838         LASSERT(atomic_read(&import->imp_refcount) > 0);
839         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
840         LASSERT(list_empty(&import->imp_zombie_chain));
841
842         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
843                atomic_read(&import->imp_refcount) - 1);
844
845         if (atomic_dec_and_test(&import->imp_refcount)) {
846
847                 CDEBUG(D_INFO, "final put import %p\n", import);
848
849                 spin_lock(&obd_zombie_impexp_lock);
850                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
851                 spin_unlock(&obd_zombie_impexp_lock);
852
853                 if (obd_zombie_impexp_notify != NULL)
854                         obd_zombie_impexp_notify();
855         }
856
857         EXIT;
858 }
859 EXPORT_SYMBOL(class_import_put);
860
861 void class_import_destroy(struct obd_import *import)
862 {
863         ENTRY;
864
865         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
866                 import->imp_obd->obd_name);
867
868         LASSERT(atomic_read(&import->imp_refcount) == 0);
869
870         ptlrpc_put_connection_superhack(import->imp_connection);
871
872         while (!list_empty(&import->imp_conn_list)) {
873                 struct obd_import_conn *imp_conn;
874
875                 imp_conn = list_entry(import->imp_conn_list.next,
876                                       struct obd_import_conn, oic_item);
877                 list_del(&imp_conn->oic_item);
878                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
879                 OBD_FREE(imp_conn, sizeof(*imp_conn));
880         }
881
882         LASSERT(import->imp_sec == NULL);
883         class_decref(import->imp_obd, "import", import);
884         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
885         EXIT;
886 }
887
888 static void init_imp_at(struct imp_at *at) {
889         int i;
890         at_init(&at->iat_net_latency, 0, 0);
891         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
892                 /* max service estimates are tracked on the server side, so
893                    don't use the AT history here, just use the last reported
894                    val. (But keep hist for proc histogram, worst_ever) */
895                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
896                         AT_FLG_NOHIST);
897         }
898 }
899
900 struct obd_import *class_new_import(struct obd_device *obd)
901 {
902         struct obd_import *imp;
903
904         OBD_ALLOC(imp, sizeof(*imp));
905         if (imp == NULL)
906                 return NULL;
907
908         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
909         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
910         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
911         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
912         spin_lock_init(&imp->imp_lock);
913         imp->imp_last_success_conn = 0;
914         imp->imp_state = LUSTRE_IMP_NEW;
915         imp->imp_obd = class_incref(obd, "import", imp);
916         sema_init(&imp->imp_sec_mutex, 1);
917         cfs_waitq_init(&imp->imp_recovery_waitq);
918
919         atomic_set(&imp->imp_refcount, 2);
920         atomic_set(&imp->imp_inflight, 0);
921         atomic_set(&imp->imp_replay_inflight, 0);
922         atomic_set(&imp->imp_inval_count, 0);
923         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
924         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
925         class_handle_hash(&imp->imp_handle, import_handle_addref);
926         init_imp_at(&imp->imp_at);
927
928         /* the default magic is V2, will be used in connect RPC, and
929          * then adjusted according to the flags in request/reply. */
930         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
931
932         return imp;
933 }
934 EXPORT_SYMBOL(class_new_import);
935
936 void class_destroy_import(struct obd_import *import)
937 {
938         LASSERT(import != NULL);
939         LASSERT(import != LP_POISON);
940
941         class_handle_unhash(&import->imp_handle);
942
943         spin_lock(&import->imp_lock);
944         import->imp_generation++;
945         spin_unlock(&import->imp_lock);
946         class_import_put(import);
947 }
948 EXPORT_SYMBOL(class_destroy_import);
949
950 /* A connection defines an export context in which preallocation can
951    be managed. This releases the export pointer reference, and returns
952    the export handle, so the export refcount is 1 when this function
953    returns. */
954 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
955                   struct obd_uuid *cluuid)
956 {
957         struct obd_export *export;
958         LASSERT(conn != NULL);
959         LASSERT(obd != NULL);
960         LASSERT(cluuid != NULL);
961         ENTRY;
962
963         export = class_new_export(obd, cluuid);
964         if (IS_ERR(export))
965                 RETURN(PTR_ERR(export));
966
967         conn->cookie = export->exp_handle.h_cookie;
968         class_export_put(export);
969
970         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
971                cluuid->uuid, conn->cookie);
972         RETURN(0);
973 }
974 EXPORT_SYMBOL(class_connect);
975
976 /* if export is involved in recovery then clean up related things */
977 void class_export_recovery_cleanup(struct obd_export *exp)
978 {
979         struct obd_device *obd = exp->exp_obd;
980
981         spin_lock_bh(&obd->obd_processing_task_lock);
982         if (obd->obd_recovering && exp->exp_in_recovery) {
983                 spin_lock(&exp->exp_lock);
984                 exp->exp_in_recovery = 0;
985                 spin_unlock(&exp->exp_lock);
986                 obd->obd_connected_clients--;
987                 /* each connected client is counted as recoverable */
988                 obd->obd_recoverable_clients--;
989                 if (exp->exp_req_replay_needed) {
990                         spin_lock(&exp->exp_lock);
991                         exp->exp_req_replay_needed = 0;
992                         spin_unlock(&exp->exp_lock);
993                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
994                         atomic_dec(&obd->obd_req_replay_clients);
995                 }
996                 if (exp->exp_lock_replay_needed) {
997                         spin_lock(&exp->exp_lock);
998                         exp->exp_lock_replay_needed = 0;
999                         spin_unlock(&exp->exp_lock);
1000                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1001                         atomic_dec(&obd->obd_lock_replay_clients);
1002                 }
1003         }
1004         spin_unlock_bh(&obd->obd_processing_task_lock);
1005 }
1006
1007 /* This function removes two references from the export: one for the
1008  * hash entry and one for the export pointer passed in.  The export
1009  * pointer passed to this function is destroyed should not be used
1010  * again. */
1011 int class_disconnect(struct obd_export *export)
1012 {
1013         int already_disconnected;
1014         ENTRY;
1015
1016         if (export == NULL) {
1017                 fixme();
1018                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1019                 RETURN(-EINVAL);
1020         }
1021
1022         spin_lock(&export->exp_lock);
1023         already_disconnected = export->exp_disconnected;
1024         export->exp_disconnected = 1;
1025
1026         if (!hlist_unhashed(&export->exp_nid_hash))
1027                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1028                                 &export->exp_connection->c_peer.nid,
1029                                 &export->exp_nid_hash);
1030
1031         spin_unlock(&export->exp_lock);
1032
1033         /* class_cleanup(), abort_recovery(), and class_fail_export()
1034          * all end up in here, and if any of them race we shouldn't
1035          * call extra class_export_puts(). */
1036         if (already_disconnected)
1037                 RETURN(0);
1038
1039         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1040                export->exp_handle.h_cookie);
1041
1042         class_export_recovery_cleanup(export);
1043         class_unlink_export(export);
1044         class_export_put(export);
1045         RETURN(0);
1046 }
1047
1048 static void class_disconnect_export_list(struct list_head *list, int flags)
1049 {
1050         int rc;
1051         struct lustre_handle fake_conn;
1052         struct obd_export *fake_exp, *exp;
1053         ENTRY;
1054
1055         /* It's possible that an export may disconnect itself, but
1056          * nothing else will be added to this list. */
1057         while (!list_empty(list)) {
1058                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1059                 class_export_get(exp);
1060
1061                 spin_lock(&exp->exp_lock);
1062                 exp->exp_flags = flags;
1063                 spin_unlock(&exp->exp_lock);
1064
1065                 if (obd_uuid_equals(&exp->exp_client_uuid,
1066                                     &exp->exp_obd->obd_uuid)) {
1067                         CDEBUG(D_HA,
1068                                "exp %p export uuid == obd uuid, don't discon\n",
1069                                exp);
1070                         /* Need to delete this now so we don't end up pointing
1071                          * to work_list later when this export is cleaned up. */
1072                         list_del_init(&exp->exp_obd_chain);
1073                         class_export_put(exp);
1074                         continue;
1075                 }
1076
1077                 fake_conn.cookie = exp->exp_handle.h_cookie;
1078                 fake_exp = class_conn2export(&fake_conn);
1079                 if (!fake_exp) {
1080                         class_export_put(exp);
1081                         continue;
1082                 }
1083
1084                 spin_lock(&fake_exp->exp_lock);
1085                 fake_exp->exp_flags = flags;
1086                 spin_unlock(&fake_exp->exp_lock);
1087
1088                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1089                        "last request at "CFS_TIME_T"\n",
1090                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1091                        exp, exp->exp_last_request_time);
1092                 rc = obd_disconnect(fake_exp);
1093                 class_export_put(exp);
1094         }
1095         EXIT;
1096 }
1097
1098 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1099 {
1100         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1101                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1102 }
1103
1104 void class_disconnect_exports(struct obd_device *obd)
1105 {
1106         struct list_head work_list;
1107         ENTRY;
1108
1109         /* Move all of the exports from obd_exports to a work list, en masse. */
1110         spin_lock(&obd->obd_dev_lock);
1111         list_add(&work_list, &obd->obd_exports);
1112         list_del_init(&obd->obd_exports);
1113         spin_unlock(&obd->obd_dev_lock);
1114
1115         if (!list_empty(&work_list)) {
1116                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1117                        "disconnecting them\n", obd->obd_minor, obd);
1118                 class_disconnect_export_list(&work_list,
1119                                              get_exp_flags_from_obd(obd));
1120         } else
1121                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1122                        obd->obd_minor, obd);
1123         EXIT;
1124 }
1125 EXPORT_SYMBOL(class_disconnect_exports);
1126
1127 /* Remove exports that have not completed recovery.
1128  */
1129 int class_disconnect_stale_exports(struct obd_device *obd,
1130                                    int (*test_export)(struct obd_export *))
1131 {
1132         struct list_head work_list;
1133         struct list_head *pos, *n;
1134         struct obd_export *exp;
1135         int cnt = 0;
1136         ENTRY;
1137
1138         CFS_INIT_LIST_HEAD(&work_list);
1139         spin_lock(&obd->obd_dev_lock);
1140         list_for_each_safe(pos, n, &obd->obd_exports) {
1141                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1142                 if (test_export(exp))
1143                         continue;
1144
1145                 list_del(&exp->exp_obd_chain);
1146                 list_add(&exp->exp_obd_chain, &work_list);
1147                 /* don't count self-export as client */
1148                 if (obd_uuid_equals(&exp->exp_client_uuid,
1149                                      &exp->exp_obd->obd_uuid))
1150                         continue;
1151
1152                 cnt++;
1153                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1154                        obd->obd_name, exp->exp_client_uuid.uuid,
1155                        exp->exp_connection == NULL ? "<unknown>" :
1156                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1157         }
1158         spin_unlock(&obd->obd_dev_lock);
1159
1160         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1161                obd->obd_name, cnt);
1162         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1163         RETURN(cnt);
1164 }
1165 EXPORT_SYMBOL(class_disconnect_stale_exports);
1166
1167 int oig_init(struct obd_io_group **oig_out)
1168 {
1169         struct obd_io_group *oig;
1170         ENTRY;
1171
1172         OBD_ALLOC(oig, sizeof(*oig));
1173         if (oig == NULL)
1174                 RETURN(-ENOMEM);
1175
1176         spin_lock_init(&oig->oig_lock);
1177         oig->oig_rc = 0;
1178         oig->oig_pending = 0;
1179         atomic_set(&oig->oig_refcount, 1);
1180         cfs_waitq_init(&oig->oig_waitq);
1181         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1182
1183         *oig_out = oig;
1184         RETURN(0);
1185 };
1186 EXPORT_SYMBOL(oig_init);
1187
1188 static inline void oig_grab(struct obd_io_group *oig)
1189 {
1190         atomic_inc(&oig->oig_refcount);
1191 }
1192
1193 void oig_release(struct obd_io_group *oig)
1194 {
1195         if (atomic_dec_and_test(&oig->oig_refcount))
1196                 OBD_FREE(oig, sizeof(*oig));
1197 }
1198 EXPORT_SYMBOL(oig_release);
1199
1200 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1201 {
1202         int rc = 0;
1203         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1204         spin_lock(&oig->oig_lock);
1205         if (oig->oig_rc) {
1206                 rc = oig->oig_rc;
1207         } else {
1208                 oig->oig_pending++;
1209                 if (occ != NULL)
1210                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1211         }
1212         spin_unlock(&oig->oig_lock);
1213         oig_grab(oig);
1214
1215         return rc;
1216 }
1217 EXPORT_SYMBOL(oig_add_one);
1218
1219 void oig_complete_one(struct obd_io_group *oig,
1220                       struct oig_callback_context *occ, int rc)
1221 {
1222         cfs_waitq_t *wake = NULL;
1223         int old_rc;
1224
1225         spin_lock(&oig->oig_lock);
1226
1227         if (occ != NULL)
1228                 list_del_init(&occ->occ_oig_item);
1229
1230         old_rc = oig->oig_rc;
1231         if (oig->oig_rc == 0 && rc != 0)
1232                 oig->oig_rc = rc;
1233
1234         if (--oig->oig_pending <= 0)
1235                 wake = &oig->oig_waitq;
1236
1237         spin_unlock(&oig->oig_lock);
1238
1239         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1240                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1241                         oig->oig_pending);
1242         if (wake)
1243                 cfs_waitq_signal(wake);
1244         oig_release(oig);
1245 }
1246 EXPORT_SYMBOL(oig_complete_one);
1247
1248 static int oig_done(struct obd_io_group *oig)
1249 {
1250         int rc = 0;
1251         spin_lock(&oig->oig_lock);
1252         if (oig->oig_pending <= 0)
1253                 rc = 1;
1254         spin_unlock(&oig->oig_lock);
1255         return rc;
1256 }
1257
1258 static void interrupted_oig(void *data)
1259 {
1260         struct obd_io_group *oig = data;
1261         struct oig_callback_context *occ;
1262
1263         spin_lock(&oig->oig_lock);
1264         /* We need to restart the processing each time we drop the lock, as
1265          * it is possible other threads called oig_complete_one() to remove
1266          * an entry elsewhere in the list while we dropped lock.  We need to
1267          * drop the lock because osc_ap_completion() calls oig_complete_one()
1268          * which re-gets this lock ;-) as well as a lock ordering issue. */
1269 restart:
1270         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1271                 if (occ->interrupted)
1272                         continue;
1273                 occ->interrupted = 1;
1274                 spin_unlock(&oig->oig_lock);
1275                 occ->occ_interrupted(occ);
1276                 spin_lock(&oig->oig_lock);
1277                 goto restart;
1278         }
1279         spin_unlock(&oig->oig_lock);
1280 }
1281
1282 int oig_wait(struct obd_io_group *oig)
1283 {
1284         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1285         int rc;
1286
1287         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1288
1289         do {
1290                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1291                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1292                 /* we can't continue until the oig has emptied and stopped
1293                  * referencing state that the caller will free upon return */
1294                 if (rc == -EINTR)
1295                         lwi = (struct l_wait_info){ 0, };
1296         } while (rc == -EINTR);
1297
1298         LASSERTF(oig->oig_pending == 0,
1299                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1300                  oig->oig_pending);
1301
1302         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1303         return oig->oig_rc;
1304 }
1305 EXPORT_SYMBOL(oig_wait);
1306
1307 void class_fail_export(struct obd_export *exp)
1308 {
1309         int rc, already_failed;
1310
1311         spin_lock(&exp->exp_lock);
1312         already_failed = exp->exp_failed;
1313         exp->exp_failed = 1;
1314         spin_unlock(&exp->exp_lock);
1315
1316         if (already_failed) {
1317                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1318                        exp, exp->exp_client_uuid.uuid);
1319                 return;
1320         }
1321
1322         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1323                exp, exp->exp_client_uuid.uuid);
1324
1325         if (obd_dump_on_timeout)
1326                 libcfs_debug_dumplog();
1327
1328         /* Most callers into obd_disconnect are removing their own reference
1329          * (request, for example) in addition to the one from the hash table.
1330          * We don't have such a reference here, so make one. */
1331         class_export_get(exp);
1332         rc = obd_disconnect(exp);
1333         if (rc)
1334                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1335         else
1336                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1337                        exp, exp->exp_client_uuid.uuid);
1338 }
1339 EXPORT_SYMBOL(class_fail_export);
1340
1341 char *obd_export_nid2str(struct obd_export *exp)
1342 {
1343         if (exp->exp_connection != NULL)
1344                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1345
1346         return "(no nid)";
1347 }
1348 EXPORT_SYMBOL(obd_export_nid2str);
1349
1350 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1351 {
1352         struct obd_export *doomed_exp = NULL;
1353         int exports_evicted = 0;
1354
1355         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1356
1357         do {
1358                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1359                 if (doomed_exp == NULL)
1360                         break;
1361
1362                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1363                          "nid %s found, wanted nid %s, requested nid %s\n",
1364                          obd_export_nid2str(doomed_exp),
1365                          libcfs_nid2str(nid_key), nid);
1366                 LASSERTF(doomed_exp != obd->obd_self_export,
1367                          "self-export is hashed by NID?\n");
1368                 exports_evicted++;
1369                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1370                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1371                        exports_evicted);
1372                 class_fail_export(doomed_exp);
1373                 class_export_put(doomed_exp);
1374         } while (1);
1375
1376         if (!exports_evicted)
1377                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1378                        obd->obd_name, nid);
1379         return exports_evicted;
1380 }
1381 EXPORT_SYMBOL(obd_export_evict_by_nid);
1382
1383 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1384 {
1385         struct obd_export *doomed_exp = NULL;
1386         struct obd_uuid doomed_uuid;
1387         int exports_evicted = 0;
1388
1389         obd_str2uuid(&doomed_uuid, uuid);
1390         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1391                 CERROR("%s: can't evict myself\n", obd->obd_name);
1392                 return exports_evicted;
1393         }
1394
1395         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1396
1397         if (doomed_exp == NULL) {
1398                 CERROR("%s: can't disconnect %s: no exports found\n",
1399                        obd->obd_name, uuid);
1400         } else {
1401                 CWARN("%s: evicting %s at adminstrative request\n",
1402                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1403                 class_fail_export(doomed_exp);
1404                 class_export_put(doomed_exp);
1405                 exports_evicted++;
1406         }
1407
1408         return exports_evicted;
1409 }
1410 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1411
1412 /**
1413  * kill zombie imports and exports
1414  */
1415 void obd_zombie_impexp_cull(void)
1416 {
1417         struct obd_import *import;
1418         struct obd_export *export;
1419         ENTRY;
1420
1421         do {
1422                 spin_lock (&obd_zombie_impexp_lock);
1423
1424                 import = NULL;
1425                 if (!list_empty(&obd_zombie_imports)) {
1426                         import = list_entry(obd_zombie_imports.next,
1427                                             struct obd_import,
1428                                             imp_zombie_chain);
1429                         list_del(&import->imp_zombie_chain);
1430                 }
1431
1432                 export = NULL;
1433                 if (!list_empty(&obd_zombie_exports)) {
1434                         export = list_entry(obd_zombie_exports.next,
1435                                             struct obd_export,
1436                                             exp_obd_chain);
1437                         list_del_init(&export->exp_obd_chain);
1438                 }
1439
1440                 spin_unlock(&obd_zombie_impexp_lock);
1441
1442                 if (import != NULL)
1443                         class_import_destroy(import);
1444
1445                 if (export != NULL)
1446                         class_export_destroy(export);
1447
1448         } while (import != NULL || export != NULL);
1449         EXIT;
1450 }
1451
1452 static struct completion        obd_zombie_start;
1453 static struct completion        obd_zombie_stop;
1454 static unsigned long            obd_zombie_flags;
1455 static cfs_waitq_t              obd_zombie_waitq;
1456
1457 enum {
1458         OBD_ZOMBIE_STOP = 1
1459 };
1460
1461 /**
1462  * check for work for kill zombie import/export thread.
1463  */
1464 int obd_zombie_impexp_check(void *arg)
1465 {
1466         int rc;
1467
1468         spin_lock(&obd_zombie_impexp_lock);
1469         rc = list_empty(&obd_zombie_imports) &&
1470              list_empty(&obd_zombie_exports) &&
1471              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1472
1473         spin_unlock(&obd_zombie_impexp_lock);
1474
1475         RETURN(rc);
1476 }
1477
1478 /**
1479  * notify import/export destroy thread about new zombie.
1480  */
1481 static void obd_zombie_impexp_notify(void)
1482 {
1483         cfs_waitq_signal(&obd_zombie_waitq);
1484 }
1485
1486 #ifdef __KERNEL__
1487
1488 /**
1489  * destroy zombie export/import thread.
1490  */
1491 static int obd_zombie_impexp_thread(void *unused)
1492 {
1493         int rc;
1494
1495         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1496                 complete(&obd_zombie_start);
1497                 RETURN(rc);
1498         }
1499
1500         complete(&obd_zombie_start);
1501
1502         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1503                 struct l_wait_info lwi = { 0 };
1504
1505                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1506
1507                 obd_zombie_impexp_cull();
1508         }
1509
1510         complete(&obd_zombie_stop);
1511
1512         RETURN(0);
1513 }
1514
1515 #else /* ! KERNEL */
1516
1517 static atomic_t zombie_recur = ATOMIC_INIT(0);
1518 static void *obd_zombie_impexp_work_cb;
1519 static void *obd_zombie_impexp_idle_cb;
1520
1521 int obd_zombie_impexp_kill(void *arg)
1522 {
1523         int rc = 0;
1524
1525         if (atomic_inc_return(&zombie_recur) == 1) {
1526                 obd_zombie_impexp_cull();
1527                 rc = 1;
1528         }
1529         atomic_dec(&zombie_recur);
1530         return rc;
1531 }
1532
1533 #endif
1534
1535 /**
1536  * start destroy zombie import/export thread
1537  */
1538 int obd_zombie_impexp_init(void)
1539 {
1540         int rc;
1541
1542         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1543         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1544         spin_lock_init(&obd_zombie_impexp_lock);
1545         init_completion(&obd_zombie_start);
1546         init_completion(&obd_zombie_stop);
1547         cfs_waitq_init(&obd_zombie_waitq);
1548
1549 #ifdef __KERNEL__
1550         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1551         if (rc < 0)
1552                 RETURN(rc);
1553
1554         wait_for_completion(&obd_zombie_start);
1555 #else
1556
1557         obd_zombie_impexp_work_cb =
1558                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1559                                                  &obd_zombie_impexp_kill, NULL);
1560
1561         obd_zombie_impexp_idle_cb =
1562                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1563                                                  &obd_zombie_impexp_check, NULL);
1564         rc = 0;
1565
1566 #endif
1567         RETURN(rc);
1568 }
1569 /**
1570  * stop destroy zombie import/export thread
1571  */
1572 void obd_zombie_impexp_stop(void)
1573 {
1574         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1575         obd_zombie_impexp_notify();
1576 #ifdef __KERNEL__
1577         wait_for_completion(&obd_zombie_stop);
1578 #else
1579         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1580         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1581 #endif
1582 }