Whamcloud - gitweb
get rid of trailing whitespaces.
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94 EXPORT_SYMBOL(obd_device_free);
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150                         struct lprocfs_vars *vars, const char *name,
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173
174         if (type->typ_dt_ops == NULL ||
175             type->typ_md_ops == NULL ||
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = ldt->ldt_ops->ldto_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 /**
258  * Create a new obd device.
259  *
260  * Find an empty slot in ::obd_devs[], create a new obd device in it.
261  *
262  * \param typename [in] obd device type string.
263  * \param name     [in] obd device name.
264  *
265  * \retval NULL if create fails, otherwise return the obd device
266  *         pointer created.
267  */
268 struct obd_device *class_newdev(const char *type_name, const char *name)
269 {
270         struct obd_device *result = NULL;
271         struct obd_device *newdev;
272         struct obd_type *type = NULL;
273         int i;
274         int new_obd_minor = 0;
275
276         if (strlen(name) >= MAX_OBD_NAME) {
277                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278                 RETURN(ERR_PTR(-EINVAL));
279         }
280
281         type = class_get_type(type_name);
282         if (type == NULL){
283                 CERROR("OBD: unknown type: %s\n", type_name);
284                 RETURN(ERR_PTR(-ENODEV));
285         }
286
287         newdev = obd_device_alloc();
288         if (newdev == NULL) {
289                 class_put_type(type);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
293
294         spin_lock(&obd_dev_lock);
295         for (i = 0; i < class_devno_max(); i++) {
296                 struct obd_device *obd = class_num2obd(i);
297                 if (obd && obd->obd_name &&
298                     (strcmp(name, obd->obd_name) == 0)) {
299                         CERROR("Device %s already exists, won't add\n", name);
300                         if (result) {
301                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302                                          "%p obd_magic %08x != %08x\n", result,
303                                          result->obd_magic, OBD_DEVICE_MAGIC);
304                                 LASSERTF(result->obd_minor == new_obd_minor,
305                                          "%p obd_minor %d != %d\n", result,
306                                          result->obd_minor, new_obd_minor);
307
308                                 obd_devs[result->obd_minor] = NULL;
309                                 result->obd_name[0]='\0';
310                          }
311                         result = ERR_PTR(-EEXIST);
312                         break;
313                 }
314                 if (!result && !obd) {
315                         result = newdev;
316                         result->obd_minor = i;
317                         new_obd_minor = i;
318                         result->obd_type = type;
319                         strncpy(result->obd_name, name,
320                                 sizeof(result->obd_name));
321                         obd_devs[i] = result;
322                 }
323         }
324         spin_unlock(&obd_dev_lock);
325
326         if (result == NULL && i >= class_devno_max()) {
327                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328                        class_devno_max());
329                 result = ERR_PTR(-EOVERFLOW);
330         }
331
332         if (IS_ERR(result)) {
333                 obd_device_free(newdev);
334                 class_put_type(type);
335         } else {
336                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337                        result->obd_name, result);
338         }
339         return result;
340 }
341
342 void class_release_dev(struct obd_device *obd)
343 {
344         struct obd_type *obd_type = obd->obd_type;
345
346         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350         LASSERT(obd_type != NULL);
351
352         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353                obd->obd_name,obd->obd_type->typ_name);
354
355         spin_lock(&obd_dev_lock);
356         obd_devs[obd->obd_minor] = NULL;
357         spin_unlock(&obd_dev_lock);
358         obd_device_free(obd);
359
360         class_put_type(obd_type);
361 }
362
363 int class_name2dev(const char *name)
364 {
365         int i;
366
367         if (!name)
368                 return -1;
369
370         spin_lock(&obd_dev_lock);
371         for (i = 0; i < class_devno_max(); i++) {
372                 struct obd_device *obd = class_num2obd(i);
373                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374                         /* Make sure we finished attaching before we give
375                            out any references */
376                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377                         if (obd->obd_attached) {
378                                 spin_unlock(&obd_dev_lock);
379                                 return i;
380                         }
381                         break;
382                 }
383         }
384         spin_unlock(&obd_dev_lock);
385
386         return -1;
387 }
388
389 struct obd_device *class_name2obd(const char *name)
390 {
391         int dev = class_name2dev(name);
392
393         if (dev < 0 || dev > class_devno_max())
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 int class_uuid2dev(struct obd_uuid *uuid)
399 {
400         int i;
401
402         spin_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407                         spin_unlock(&obd_dev_lock);
408                         return i;
409                 }
410         }
411         spin_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
417 {
418         int dev = class_uuid2dev(uuid);
419         if (dev < 0)
420                 return NULL;
421         return class_num2obd(dev);
422 }
423
424 /**
425  * Get obd device from ::obd_devs[]
426  *
427  * \param num [in] array index
428  *
429  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430  *         otherwise return the obd device there.
431  */
432 struct obd_device *class_num2obd(int num)
433 {
434         struct obd_device *obd = NULL;
435
436         if (num < class_devno_max()) {
437                 obd = obd_devs[num];
438                 if (obd == NULL)
439                         return NULL;
440
441                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442                          "%p obd_magic %08x != %08x\n",
443                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444                 LASSERTF(obd->obd_minor == num,
445                          "%p obd_minor %0d != %0d\n",
446                          obd, obd->obd_minor, num);
447         }
448
449         return obd;
450 }
451
452 void class_obd_list(void)
453 {
454         char *status;
455         int i;
456
457         spin_lock(&obd_dev_lock);
458         for (i = 0; i < class_devno_max(); i++) {
459                 struct obd_device *obd = class_num2obd(i);
460                 if (obd == NULL)
461                         continue;
462                 if (obd->obd_stopping)
463                         status = "ST";
464                 else if (obd->obd_set_up)
465                         status = "UP";
466                 else if (obd->obd_attached)
467                         status = "AT";
468                 else
469                         status = "--";
470                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471                          i, status, obd->obd_type->typ_name,
472                          obd->obd_name, obd->obd_uuid.uuid,
473                          atomic_read(&obd->obd_refcount));
474         }
475         spin_unlock(&obd_dev_lock);
476         return;
477 }
478
479 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
480    specified, then only the client with that uuid is returned,
481    otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483                                           const char * typ_name,
484                                           struct obd_uuid *grp_uuid)
485 {
486         int i;
487
488         spin_lock(&obd_dev_lock);
489         for (i = 0; i < class_devno_max(); i++) {
490                 struct obd_device *obd = class_num2obd(i);
491                 if (obd == NULL)
492                         continue;
493                 if ((strncmp(obd->obd_type->typ_name, typ_name,
494                              strlen(typ_name)) == 0)) {
495                         if (obd_uuid_equals(tgt_uuid,
496                                             &obd->u.cli.cl_target_uuid) &&
497                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
498                                                          &obd->obd_uuid) : 1)) {
499                                 spin_unlock(&obd_dev_lock);
500                                 return obd;
501                         }
502                 }
503         }
504         spin_unlock(&obd_dev_lock);
505
506         return NULL;
507 }
508
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510                                             struct obd_uuid *grp_uuid)
511 {
512         struct obd_device *obd;
513
514         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
515         if (!obd)
516                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
517                                             grp_uuid);
518         return obd;
519 }
520
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522    searching at *next, and if a device is found, the next index to look
523    at is saved in *next. If next is NULL, then the first matching device
524    will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
526 {
527         int i;
528
529         if (next == NULL)
530                 i = 0;
531         else if (*next >= 0 && *next < class_devno_max())
532                 i = *next;
533         else
534                 return NULL;
535
536         spin_lock(&obd_dev_lock);
537         for (; i < class_devno_max(); i++) {
538                 struct obd_device *obd = class_num2obd(i);
539                 if (obd == NULL)
540                         continue;
541                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
542                         if (next != NULL)
543                                 *next = i+1;
544                         spin_unlock(&obd_dev_lock);
545                         return obd;
546                 }
547         }
548         spin_unlock(&obd_dev_lock);
549
550         return NULL;
551 }
552
553
554 void obd_cleanup_caches(void)
555 {
556         int rc;
557
558         ENTRY;
559         if (obd_device_cachep) {
560                 rc = cfs_mem_cache_destroy(obd_device_cachep);
561                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562                 obd_device_cachep = NULL;
563         }
564         if (obdo_cachep) {
565                 rc = cfs_mem_cache_destroy(obdo_cachep);
566                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
567                 obdo_cachep = NULL;
568         }
569         if (import_cachep) {
570                 rc = cfs_mem_cache_destroy(import_cachep);
571                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572                 import_cachep = NULL;
573         }
574         if (capa_cachep) {
575                 rc = cfs_mem_cache_destroy(capa_cachep);
576                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
577                 capa_cachep = NULL;
578         }
579         EXIT;
580 }
581
582 int obd_init_caches(void)
583 {
584         ENTRY;
585
586         LASSERT(obd_device_cachep == NULL);
587         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588                                                  sizeof(struct obd_device),
589                                                  0, 0);
590         if (!obd_device_cachep)
591                 GOTO(out, -ENOMEM);
592
593         LASSERT(obdo_cachep == NULL);
594         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
595                                            0, 0);
596         if (!obdo_cachep)
597                 GOTO(out, -ENOMEM);
598
599         LASSERT(import_cachep == NULL);
600         import_cachep = cfs_mem_cache_create("ll_import_cache",
601                                              sizeof(struct obd_import),
602                                              0, 0);
603         if (!import_cachep)
604                 GOTO(out, -ENOMEM);
605
606         LASSERT(capa_cachep == NULL);
607         capa_cachep = cfs_mem_cache_create("capa_cache",
608                                            sizeof(struct obd_capa), 0, 0);
609         if (!capa_cachep)
610                 GOTO(out, -ENOMEM);
611
612         RETURN(0);
613  out:
614         obd_cleanup_caches();
615         RETURN(-ENOMEM);
616
617 }
618
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
621 {
622         struct obd_export *export;
623         ENTRY;
624
625         if (!conn) {
626                 CDEBUG(D_CACHE, "looking for null handle\n");
627                 RETURN(NULL);
628         }
629
630         if (conn->cookie == -1) {  /* this means assign a new connection */
631                 CDEBUG(D_CACHE, "want a new connection\n");
632                 RETURN(NULL);
633         }
634
635         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636         export = class_handle2object(conn->cookie);
637         RETURN(export);
638 }
639
640 struct obd_device *class_exp2obd(struct obd_export *exp)
641 {
642         if (exp)
643                 return exp->exp_obd;
644         return NULL;
645 }
646
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
648 {
649         struct obd_export *export;
650         export = class_conn2export(conn);
651         if (export) {
652                 struct obd_device *obd = export->exp_obd;
653                 class_export_put(export);
654                 return obd;
655         }
656         return NULL;
657 }
658
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
660 {
661         struct obd_device *obd = exp->exp_obd;
662         if (obd == NULL)
663                 return NULL;
664         return obd->u.cli.cl_import;
665 }
666
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
668 {
669         struct obd_device *obd = class_conn2obd(conn);
670         if (obd == NULL)
671                 return NULL;
672         return obd->u.cli.cl_import;
673 }
674
675 /* Export management functions */
676 static void export_handle_addref(void *export)
677 {
678         class_export_get(export);
679 }
680
681 void __class_export_put(struct obd_export *exp)
682 {
683         if (atomic_dec_and_test(&exp->exp_refcount)) {
684                 LASSERT (list_empty(&exp->exp_obd_chain));
685
686                 CDEBUG(D_IOCTL, "final put %p/%s\n",
687                        exp, exp->exp_client_uuid.uuid);
688
689                 spin_lock(&obd_zombie_impexp_lock);
690                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
691                 spin_unlock(&obd_zombie_impexp_lock);
692
693                 if (obd_zombie_impexp_notify != NULL)
694                         obd_zombie_impexp_notify();
695         }
696 }
697 EXPORT_SYMBOL(__class_export_put);
698
699 void class_export_destroy(struct obd_export *exp)
700 {
701         struct obd_device *obd = exp->exp_obd;
702         ENTRY;
703
704         LASSERT (atomic_read(&exp->exp_refcount) == 0);
705
706         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
707                exp->exp_client_uuid.uuid, obd->obd_name);
708
709         LASSERT(obd != NULL);
710
711         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
712         if (exp->exp_connection)
713                 ptlrpc_put_connection_superhack(exp->exp_connection);
714
715         LASSERT(list_empty(&exp->exp_outstanding_replies));
716         LASSERT(list_empty(&exp->exp_req_replay_queue));
717         obd_destroy_export(exp);
718
719         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
720         class_decref(obd);
721         EXIT;
722 }
723
724 /* Creates a new export, adds it to the hash table, and returns a
725  * pointer to it. The refcount is 2: one for the hash reference, and
726  * one for the pointer returned by this function. */
727 struct obd_export *class_new_export(struct obd_device *obd,
728                                     struct obd_uuid *cluuid)
729 {
730         struct obd_export *export;
731         int rc = 0;
732
733         OBD_ALLOC_PTR(export);
734         if (!export)
735                 return ERR_PTR(-ENOMEM);
736
737         export->exp_conn_cnt = 0;
738         atomic_set(&export->exp_refcount, 2);
739         atomic_set(&export->exp_rpc_count, 0);
740         export->exp_obd = obd;
741         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
742         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
743         /* XXX this should be in LDLM init */
744         CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
745         spin_lock_init(&export->exp_ldlm_data.led_lock);
746
747         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
748         class_handle_hash(&export->exp_handle, export_handle_addref);
749         export->exp_last_request_time = cfs_time_current_sec();
750         spin_lock_init(&export->exp_lock);
751         INIT_HLIST_NODE(&export->exp_uuid_hash);
752         INIT_HLIST_NODE(&export->exp_nid_hash);
753
754         export->exp_sp_peer = LUSTRE_SP_ANY;
755         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
756         export->exp_client_uuid = *cluuid;
757         obd_init_export(export);
758
759         spin_lock(&obd->obd_dev_lock);
760         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
761                rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid,
762                                                &export->exp_uuid_hash);
763                if (rc != 0) {
764                        CWARN("%s: denying duplicate export for %s\n",
765                              obd->obd_name, cluuid->uuid);
766                        spin_unlock(&obd->obd_dev_lock);
767                        class_handle_unhash(&export->exp_handle);
768                        OBD_FREE_PTR(export);
769                        return ERR_PTR(-EALREADY);
770                }
771         }
772
773         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
774         class_incref(obd);
775         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
776         list_add_tail(&export->exp_obd_chain_timed,
777                       &export->exp_obd->obd_exports_timed);
778         export->exp_obd->obd_num_exports++;
779         spin_unlock(&obd->obd_dev_lock);
780
781         return export;
782 }
783 EXPORT_SYMBOL(class_new_export);
784
785 void class_unlink_export(struct obd_export *exp)
786 {
787         class_handle_unhash(&exp->exp_handle);
788
789         spin_lock(&exp->exp_obd->obd_dev_lock);
790         /* delete an uuid-export hashitem from hashtables */
791         if (!hlist_unhashed(&exp->exp_uuid_hash)) {
792                 lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body,
793                                     &exp->exp_client_uuid, &exp->exp_uuid_hash);
794         }
795         list_del_init(&exp->exp_obd_chain);
796         list_del_init(&exp->exp_obd_chain_timed);
797         exp->exp_obd->obd_num_exports--;
798         spin_unlock(&exp->exp_obd->obd_dev_lock);
799
800         class_export_put(exp);
801 }
802 EXPORT_SYMBOL(class_unlink_export);
803
804 /* Import management functions */
805 static void import_handle_addref(void *import)
806 {
807         class_import_get(import);
808 }
809
810 struct obd_import *class_import_get(struct obd_import *import)
811 {
812         LASSERT(atomic_read(&import->imp_refcount) >= 0);
813         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
814         atomic_inc(&import->imp_refcount);
815         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
816                atomic_read(&import->imp_refcount));
817         return import;
818 }
819 EXPORT_SYMBOL(class_import_get);
820
821 void class_import_put(struct obd_import *import)
822 {
823         ENTRY;
824
825         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
826                atomic_read(&import->imp_refcount) - 1);
827
828         LASSERT(atomic_read(&import->imp_refcount) > 0);
829         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
830         LASSERT(list_empty(&import->imp_zombie_chain));
831
832         if (atomic_dec_and_test(&import->imp_refcount)) {
833
834                 CDEBUG(D_INFO, "final put import %p\n", import);
835
836                 spin_lock(&obd_zombie_impexp_lock);
837                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
838                 spin_unlock(&obd_zombie_impexp_lock);
839
840                 if (obd_zombie_impexp_notify != NULL)
841                         obd_zombie_impexp_notify();
842         }
843
844         EXIT;
845 }
846 EXPORT_SYMBOL(class_import_put);
847
848 void class_import_destroy(struct obd_import *import)
849 {
850         ENTRY;
851
852         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
853                 import->imp_obd->obd_name);
854
855         LASSERT(atomic_read(&import->imp_refcount) == 0);
856
857         ptlrpc_put_connection_superhack(import->imp_connection);
858
859         while (!list_empty(&import->imp_conn_list)) {
860                 struct obd_import_conn *imp_conn;
861
862                 imp_conn = list_entry(import->imp_conn_list.next,
863                                       struct obd_import_conn, oic_item);
864                 list_del(&imp_conn->oic_item);
865                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
866                 OBD_FREE(imp_conn, sizeof(*imp_conn));
867         }
868
869         LASSERT(import->imp_sec == NULL);
870         class_decref(import->imp_obd);
871         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
872         EXIT;
873 }
874
875 static void init_imp_at(struct imp_at *at) {
876         int i;
877         at_init(&at->iat_net_latency, 0, 0);
878         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
879                 /* max service estimates are tracked on the server side, so
880                    don't use the AT history here, just use the last reported
881                    val. (But keep hist for proc histogram, worst_ever) */
882                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
883                         AT_FLG_NOHIST);
884         }
885 }
886
887 struct obd_import *class_new_import(struct obd_device *obd)
888 {
889         struct obd_import *imp;
890
891         OBD_ALLOC(imp, sizeof(*imp));
892         if (imp == NULL)
893                 return NULL;
894
895         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
896         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
897         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
898         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
899         spin_lock_init(&imp->imp_lock);
900         imp->imp_last_success_conn = 0;
901         imp->imp_state = LUSTRE_IMP_NEW;
902         imp->imp_obd = class_incref(obd);
903         sema_init(&imp->imp_sec_mutex, 1);
904         cfs_waitq_init(&imp->imp_recovery_waitq);
905
906         atomic_set(&imp->imp_refcount, 2);
907         atomic_set(&imp->imp_inflight, 0);
908         atomic_set(&imp->imp_replay_inflight, 0);
909         atomic_set(&imp->imp_inval_count, 0);
910         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
911         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
912         class_handle_hash(&imp->imp_handle, import_handle_addref);
913         init_imp_at(&imp->imp_at);
914
915         /* the default magic is V2, will be used in connect RPC, and
916          * then adjusted according to the flags in request/reply. */
917         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
918
919         return imp;
920 }
921 EXPORT_SYMBOL(class_new_import);
922
923 void class_destroy_import(struct obd_import *import)
924 {
925         LASSERT(import != NULL);
926         LASSERT(import != LP_POISON);
927
928         class_handle_unhash(&import->imp_handle);
929
930         spin_lock(&import->imp_lock);
931         import->imp_generation++;
932         spin_unlock(&import->imp_lock);
933         class_import_put(import);
934 }
935 EXPORT_SYMBOL(class_destroy_import);
936
937 /* A connection defines an export context in which preallocation can
938    be managed. This releases the export pointer reference, and returns
939    the export handle, so the export refcount is 1 when this function
940    returns. */
941 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
942                   struct obd_uuid *cluuid)
943 {
944         struct obd_export *export;
945         LASSERT(conn != NULL);
946         LASSERT(obd != NULL);
947         LASSERT(cluuid != NULL);
948         ENTRY;
949
950         export = class_new_export(obd, cluuid);
951         if (IS_ERR(export))
952                 RETURN(PTR_ERR(export));
953
954         conn->cookie = export->exp_handle.h_cookie;
955         class_export_put(export);
956
957         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
958                cluuid->uuid, conn->cookie);
959         RETURN(0);
960 }
961 EXPORT_SYMBOL(class_connect);
962
963 /* if export is involved in recovery then clean up related things */
964 void class_export_recovery_cleanup(struct obd_export *exp)
965 {
966         struct obd_device *obd = exp->exp_obd;
967
968         spin_lock_bh(&obd->obd_processing_task_lock);
969         if (obd->obd_recovering && exp->exp_in_recovery) {
970                 spin_lock(&exp->exp_lock);
971                 exp->exp_in_recovery = 0;
972                 spin_unlock(&exp->exp_lock);
973                 obd->obd_connected_clients--;
974                 /* each connected client is counted as recoverable */
975                 obd->obd_recoverable_clients--;
976                 if (exp->exp_req_replay_needed) {
977                         spin_lock(&exp->exp_lock);
978                         exp->exp_req_replay_needed = 0;
979                         spin_unlock(&exp->exp_lock);
980                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
981                         atomic_dec(&obd->obd_req_replay_clients);
982                 }
983                 if (exp->exp_lock_replay_needed) {
984                         spin_lock(&exp->exp_lock);
985                         exp->exp_lock_replay_needed = 0;
986                         spin_unlock(&exp->exp_lock);
987                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
988                         atomic_dec(&obd->obd_lock_replay_clients);
989                 }
990         }
991         spin_unlock_bh(&obd->obd_processing_task_lock);
992 }
993
994 /* This function removes two references from the export: one for the
995  * hash entry and one for the export pointer passed in.  The export
996  * pointer passed to this function is destroyed should not be used
997  * again. */
998 int class_disconnect(struct obd_export *export)
999 {
1000         int already_disconnected;
1001         ENTRY;
1002
1003         if (export == NULL) {
1004                 fixme();
1005                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1006                 RETURN(-EINVAL);
1007         }
1008
1009         spin_lock(&export->exp_lock);
1010         already_disconnected = export->exp_disconnected;
1011         export->exp_disconnected = 1;
1012
1013         if (!hlist_unhashed(&export->exp_nid_hash)) {
1014                 lustre_hash_delitem(export->exp_obd->obd_nid_hash_body,
1015                                     &export->exp_connection->c_peer.nid, &export->exp_nid_hash);
1016         }
1017         spin_unlock(&export->exp_lock);
1018
1019         /* class_cleanup(), abort_recovery(), and class_fail_export()
1020          * all end up in here, and if any of them race we shouldn't
1021          * call extra class_export_puts(). */
1022         if (already_disconnected)
1023                 RETURN(0);
1024
1025         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1026                export->exp_handle.h_cookie);
1027
1028         class_export_recovery_cleanup(export);
1029         class_unlink_export(export);
1030         class_export_put(export);
1031         RETURN(0);
1032 }
1033
1034 static void class_disconnect_export_list(struct list_head *list, int flags)
1035 {
1036         int rc;
1037         struct lustre_handle fake_conn;
1038         struct obd_export *fake_exp, *exp;
1039         ENTRY;
1040
1041         /* It's possible that an export may disconnect itself, but
1042          * nothing else will be added to this list. */
1043         while (!list_empty(list)) {
1044                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1045                 class_export_get(exp);
1046
1047                 spin_lock(&exp->exp_lock);
1048                 exp->exp_flags = flags;
1049                 spin_unlock(&exp->exp_lock);
1050
1051                 if (obd_uuid_equals(&exp->exp_client_uuid,
1052                                     &exp->exp_obd->obd_uuid)) {
1053                         CDEBUG(D_HA,
1054                                "exp %p export uuid == obd uuid, don't discon\n",
1055                                exp);
1056                         /* Need to delete this now so we don't end up pointing
1057                          * to work_list later when this export is cleaned up. */
1058                         list_del_init(&exp->exp_obd_chain);
1059                         class_export_put(exp);
1060                         continue;
1061                 }
1062
1063                 fake_conn.cookie = exp->exp_handle.h_cookie;
1064                 fake_exp = class_conn2export(&fake_conn);
1065                 if (!fake_exp) {
1066                         class_export_put(exp);
1067                         continue;
1068                 }
1069
1070                 spin_lock(&fake_exp->exp_lock);
1071                 fake_exp->exp_flags = flags;
1072                 spin_unlock(&fake_exp->exp_lock);
1073
1074                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1075                        "last request at %ld\n",
1076                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1077                        exp, exp->exp_last_request_time);
1078                 rc = obd_disconnect(fake_exp);
1079                 class_export_put(exp);
1080         }
1081         EXIT;
1082 }
1083
1084 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1085 {
1086         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1087                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1088 }
1089
1090 void class_disconnect_exports(struct obd_device *obd)
1091 {
1092         struct list_head work_list;
1093         ENTRY;
1094
1095         /* Move all of the exports from obd_exports to a work list, en masse. */
1096         spin_lock(&obd->obd_dev_lock);
1097         list_add(&work_list, &obd->obd_exports);
1098         list_del_init(&obd->obd_exports);
1099         spin_unlock(&obd->obd_dev_lock);
1100
1101         if (!list_empty(&work_list)) {
1102                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1103                        "disconnecting them\n", obd->obd_minor, obd);
1104                 class_disconnect_export_list(&work_list,
1105                                              get_exp_flags_from_obd(obd));
1106         } else
1107                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1108                        obd->obd_minor, obd);
1109         EXIT;
1110 }
1111 EXPORT_SYMBOL(class_disconnect_exports);
1112
1113 /* Remove exports that have not completed recovery.
1114  */
1115 int class_disconnect_stale_exports(struct obd_device *obd,
1116                                    int (*test_export)(struct obd_export *))
1117 {
1118         struct list_head work_list;
1119         struct list_head *pos, *n;
1120         struct obd_export *exp;
1121         int cnt = 0;
1122         ENTRY;
1123
1124         CFS_INIT_LIST_HEAD(&work_list);
1125         spin_lock(&obd->obd_dev_lock);
1126         list_for_each_safe(pos, n, &obd->obd_exports) {
1127                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1128                 if (test_export(exp))
1129                         continue;
1130
1131                 list_del(&exp->exp_obd_chain);
1132                 list_add(&exp->exp_obd_chain, &work_list);
1133                 /* don't count self-export as client */
1134                 if (obd_uuid_equals(&exp->exp_client_uuid,
1135                                      &exp->exp_obd->obd_uuid))
1136                         continue;
1137
1138                 cnt++;
1139                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1140                        obd->obd_name, exp->exp_client_uuid.uuid,
1141                        exp->exp_connection == NULL ? "<unknown>" :
1142                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1143         }
1144         spin_unlock(&obd->obd_dev_lock);
1145
1146         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1147                obd->obd_name, cnt);
1148         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1149         RETURN(cnt);
1150 }
1151 EXPORT_SYMBOL(class_disconnect_stale_exports);
1152
1153 int oig_init(struct obd_io_group **oig_out)
1154 {
1155         struct obd_io_group *oig;
1156         ENTRY;
1157
1158         OBD_ALLOC(oig, sizeof(*oig));
1159         if (oig == NULL)
1160                 RETURN(-ENOMEM);
1161
1162         spin_lock_init(&oig->oig_lock);
1163         oig->oig_rc = 0;
1164         oig->oig_pending = 0;
1165         atomic_set(&oig->oig_refcount, 1);
1166         cfs_waitq_init(&oig->oig_waitq);
1167         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1168
1169         *oig_out = oig;
1170         RETURN(0);
1171 };
1172 EXPORT_SYMBOL(oig_init);
1173
1174 static inline void oig_grab(struct obd_io_group *oig)
1175 {
1176         atomic_inc(&oig->oig_refcount);
1177 }
1178
1179 void oig_release(struct obd_io_group *oig)
1180 {
1181         if (atomic_dec_and_test(&oig->oig_refcount))
1182                 OBD_FREE(oig, sizeof(*oig));
1183 }
1184 EXPORT_SYMBOL(oig_release);
1185
1186 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1187 {
1188         int rc = 0;
1189         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1190         spin_lock(&oig->oig_lock);
1191         if (oig->oig_rc) {
1192                 rc = oig->oig_rc;
1193         } else {
1194                 oig->oig_pending++;
1195                 if (occ != NULL)
1196                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1197         }
1198         spin_unlock(&oig->oig_lock);
1199         oig_grab(oig);
1200
1201         return rc;
1202 }
1203 EXPORT_SYMBOL(oig_add_one);
1204
1205 void oig_complete_one(struct obd_io_group *oig,
1206                       struct oig_callback_context *occ, int rc)
1207 {
1208         cfs_waitq_t *wake = NULL;
1209         int old_rc;
1210
1211         spin_lock(&oig->oig_lock);
1212
1213         if (occ != NULL)
1214                 list_del_init(&occ->occ_oig_item);
1215
1216         old_rc = oig->oig_rc;
1217         if (oig->oig_rc == 0 && rc != 0)
1218                 oig->oig_rc = rc;
1219
1220         if (--oig->oig_pending <= 0)
1221                 wake = &oig->oig_waitq;
1222
1223         spin_unlock(&oig->oig_lock);
1224
1225         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1226                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1227                         oig->oig_pending);
1228         if (wake)
1229                 cfs_waitq_signal(wake);
1230         oig_release(oig);
1231 }
1232 EXPORT_SYMBOL(oig_complete_one);
1233
1234 static int oig_done(struct obd_io_group *oig)
1235 {
1236         int rc = 0;
1237         spin_lock(&oig->oig_lock);
1238         if (oig->oig_pending <= 0)
1239                 rc = 1;
1240         spin_unlock(&oig->oig_lock);
1241         return rc;
1242 }
1243
1244 static void interrupted_oig(void *data)
1245 {
1246         struct obd_io_group *oig = data;
1247         struct oig_callback_context *occ;
1248
1249         spin_lock(&oig->oig_lock);
1250         /* We need to restart the processing each time we drop the lock, as
1251          * it is possible other threads called oig_complete_one() to remove
1252          * an entry elsewhere in the list while we dropped lock.  We need to
1253          * drop the lock because osc_ap_completion() calls oig_complete_one()
1254          * which re-gets this lock ;-) as well as a lock ordering issue. */
1255 restart:
1256         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1257                 if (occ->interrupted)
1258                         continue;
1259                 occ->interrupted = 1;
1260                 spin_unlock(&oig->oig_lock);
1261                 occ->occ_interrupted(occ);
1262                 spin_lock(&oig->oig_lock);
1263                 goto restart;
1264         }
1265         spin_unlock(&oig->oig_lock);
1266 }
1267
1268 int oig_wait(struct obd_io_group *oig)
1269 {
1270         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1271         int rc;
1272
1273         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1274
1275         do {
1276                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1277                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1278                 /* we can't continue until the oig has emptied and stopped
1279                  * referencing state that the caller will free upon return */
1280                 if (rc == -EINTR)
1281                         lwi = (struct l_wait_info){ 0, };
1282         } while (rc == -EINTR);
1283
1284         LASSERTF(oig->oig_pending == 0,
1285                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1286                  oig->oig_pending);
1287
1288         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1289         return oig->oig_rc;
1290 }
1291 EXPORT_SYMBOL(oig_wait);
1292
1293 void class_fail_export(struct obd_export *exp)
1294 {
1295         int rc, already_failed;
1296
1297         spin_lock(&exp->exp_lock);
1298         already_failed = exp->exp_failed;
1299         exp->exp_failed = 1;
1300         spin_unlock(&exp->exp_lock);
1301
1302         if (already_failed) {
1303                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1304                        exp, exp->exp_client_uuid.uuid);
1305                 return;
1306         }
1307
1308         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1309                exp, exp->exp_client_uuid.uuid);
1310
1311         if (obd_dump_on_timeout)
1312                 libcfs_debug_dumplog();
1313
1314         /* Most callers into obd_disconnect are removing their own reference
1315          * (request, for example) in addition to the one from the hash table.
1316          * We don't have such a reference here, so make one. */
1317         class_export_get(exp);
1318         rc = obd_disconnect(exp);
1319         if (rc)
1320                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1321         else
1322                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1323                        exp, exp->exp_client_uuid.uuid);
1324 }
1325 EXPORT_SYMBOL(class_fail_export);
1326
1327 char *obd_export_nid2str(struct obd_export *exp)
1328 {
1329         if (exp->exp_connection != NULL)
1330                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1331
1332         return "(no nid)";
1333 }
1334 EXPORT_SYMBOL(obd_export_nid2str);
1335
1336 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1337 {
1338         struct obd_export *doomed_exp = NULL;
1339         int exports_evicted = 0;
1340
1341         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1342
1343         do {
1344                 doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
1345                                                            &nid_key);
1346                 if (doomed_exp == NULL)
1347                         break;
1348
1349                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1350                          "nid %s found, wanted nid %s, requested nid %s\n",
1351                          obd_export_nid2str(doomed_exp),
1352                          libcfs_nid2str(nid_key), nid);
1353                 LASSERTF(doomed_exp != obd->obd_self_export,
1354                          "self-export is hashed by NID?\n");
1355                 exports_evicted++;
1356                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1357                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1358                        exports_evicted);
1359                 class_fail_export(doomed_exp);
1360                 class_export_put(doomed_exp);
1361         } while (1);
1362
1363         if (!exports_evicted)
1364                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1365                        obd->obd_name, nid);
1366         return exports_evicted;
1367 }
1368 EXPORT_SYMBOL(obd_export_evict_by_nid);
1369
1370 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1371 {
1372         struct obd_export *doomed_exp = NULL;
1373         struct obd_uuid doomed;
1374         int exports_evicted = 0;
1375
1376         obd_str2uuid(&doomed, uuid);
1377         if (obd_uuid_equals(&doomed, &obd->obd_uuid)) {
1378                 CERROR("%s: can't evict myself\n", obd->obd_name);
1379                 return exports_evicted;
1380         }
1381
1382         doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body,
1383                                                    &doomed);
1384
1385         if (doomed_exp == NULL) {
1386                 CERROR("%s: can't disconnect %s: no exports found\n",
1387                        obd->obd_name, uuid);
1388         } else {
1389                 CWARN("%s: evicting %s at adminstrative request\n",
1390                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1391                 class_fail_export(doomed_exp);
1392                 class_export_put(doomed_exp);
1393                 exports_evicted++;
1394         }
1395
1396         return exports_evicted;
1397 }
1398 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1399
1400 /**
1401  * kill zombie imports and exports
1402  */
1403 void obd_zombie_impexp_cull(void)
1404 {
1405         struct obd_import *import;
1406         struct obd_export *export;
1407         ENTRY;
1408
1409         do {
1410                 spin_lock (&obd_zombie_impexp_lock);
1411
1412                 import = NULL;
1413                 if (!list_empty(&obd_zombie_imports)) {
1414                         import = list_entry(obd_zombie_imports.next,
1415                                             struct obd_import,
1416                                             imp_zombie_chain);
1417                         list_del(&import->imp_zombie_chain);
1418                 }
1419
1420                 export = NULL;
1421                 if (!list_empty(&obd_zombie_exports)) {
1422                         export = list_entry(obd_zombie_exports.next,
1423                                             struct obd_export,
1424                                             exp_obd_chain);
1425                         list_del_init(&export->exp_obd_chain);
1426                 }
1427
1428                 spin_unlock(&obd_zombie_impexp_lock);
1429
1430                 if (import != NULL)
1431                         class_import_destroy(import);
1432
1433                 if (export != NULL)
1434                         class_export_destroy(export);
1435
1436         } while (import != NULL || export != NULL);
1437         EXIT;
1438 }
1439
1440 static struct completion        obd_zombie_start;
1441 static struct completion        obd_zombie_stop;
1442 static unsigned long            obd_zombie_flags;
1443 static cfs_waitq_t              obd_zombie_waitq;
1444
1445 enum {
1446         OBD_ZOMBIE_STOP = 1
1447 };
1448
1449 /**
1450  * check for work for kill zombie import/export thread.
1451  */
1452 int obd_zombie_impexp_check(void *arg)
1453 {
1454         int rc;
1455
1456         spin_lock(&obd_zombie_impexp_lock);
1457         rc = list_empty(&obd_zombie_imports) &&
1458              list_empty(&obd_zombie_exports) &&
1459              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1460
1461         spin_unlock(&obd_zombie_impexp_lock);
1462
1463         RETURN(rc);
1464 }
1465
1466 /**
1467  * notify import/export destroy thread about new zombie.
1468  */
1469 static void obd_zombie_impexp_notify(void)
1470 {
1471         cfs_waitq_signal(&obd_zombie_waitq);
1472 }
1473
1474 #ifdef __KERNEL__
1475
1476 /**
1477  * destroy zombie export/import thread.
1478  */
1479 static int obd_zombie_impexp_thread(void *unused)
1480 {
1481         int rc;
1482
1483         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1484                 complete(&obd_zombie_start);
1485                 RETURN(rc);
1486         }
1487
1488         complete(&obd_zombie_start);
1489
1490         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1491                 struct l_wait_info lwi = { 0 };
1492
1493                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1494
1495                 obd_zombie_impexp_cull();
1496         }
1497
1498         complete(&obd_zombie_stop);
1499
1500         RETURN(0);
1501 }
1502
1503 #else /* ! KERNEL */
1504
1505 static atomic_t zombie_recur = ATOMIC_INIT(0);
1506 static void *obd_zombie_impexp_work_cb;
1507 static void *obd_zombie_impexp_idle_cb;
1508
1509 int obd_zombie_impexp_kill(void *arg)
1510 {
1511         int rc = 0;
1512
1513         if (atomic_inc_return(&zombie_recur) == 1) {
1514                 obd_zombie_impexp_cull();
1515                 rc = 1;
1516         }
1517         atomic_dec(&zombie_recur);
1518         return rc;
1519 }
1520
1521 #endif
1522
1523 /**
1524  * start destroy zombie import/export thread
1525  */
1526 int obd_zombie_impexp_init(void)
1527 {
1528         int rc;
1529
1530         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1531         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1532         spin_lock_init(&obd_zombie_impexp_lock);
1533         init_completion(&obd_zombie_start);
1534         init_completion(&obd_zombie_stop);
1535         cfs_waitq_init(&obd_zombie_waitq);
1536
1537 #ifdef __KERNEL__
1538         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1539         if (rc < 0)
1540                 RETURN(rc);
1541
1542         wait_for_completion(&obd_zombie_start);
1543 #else
1544
1545         obd_zombie_impexp_work_cb =
1546                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1547                                                  &obd_zombie_impexp_kill, NULL);
1548
1549         obd_zombie_impexp_idle_cb =
1550                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1551                                                  &obd_zombie_impexp_check, NULL);
1552         rc = 0;
1553
1554 #endif
1555         RETURN(rc);
1556 }
1557 /**
1558  * stop destroy zombie import/export thread
1559  */
1560 void obd_zombie_impexp_stop(void)
1561 {
1562         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1563         obd_zombie_impexp_notify();
1564 #ifdef __KERNEL__
1565         wait_for_completion(&obd_zombie_stop);
1566 #else
1567         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1568         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1569 #endif
1570 }
1571