Whamcloud - gitweb
3eb7073b8b7df2cd0e0a9a666767d19bb1e9baf0
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94 EXPORT_SYMBOL(obd_device_free);
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150                         struct lprocfs_vars *vars, const char *name,
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173
174         if (type->typ_dt_ops == NULL ||
175             type->typ_md_ops == NULL ||
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = ldt->ldt_ops->ldto_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 /**
258  * Create a new obd device.
259  *
260  * Find an empty slot in ::obd_devs[], create a new obd device in it.
261  *
262  * \param typename [in] obd device type string.
263  * \param name     [in] obd device name.
264  *
265  * \retval NULL if create fails, otherwise return the obd device
266  *         pointer created.
267  */
268 struct obd_device *class_newdev(const char *type_name, const char *name)
269 {
270         struct obd_device *result = NULL;
271         struct obd_device *newdev;
272         struct obd_type *type = NULL;
273         int i;
274         int new_obd_minor = 0;
275
276         if (strlen(name) >= MAX_OBD_NAME) {
277                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278                 RETURN(ERR_PTR(-EINVAL));
279         }
280
281         type = class_get_type(type_name);
282         if (type == NULL){
283                 CERROR("OBD: unknown type: %s\n", type_name);
284                 RETURN(ERR_PTR(-ENODEV));
285         }
286
287         newdev = obd_device_alloc();
288         if (newdev == NULL) {
289                 class_put_type(type);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
293
294         spin_lock(&obd_dev_lock);
295         for (i = 0; i < class_devno_max(); i++) {
296                 struct obd_device *obd = class_num2obd(i);
297                 if (obd && obd->obd_name &&
298                     (strcmp(name, obd->obd_name) == 0)) {
299                         CERROR("Device %s already exists, won't add\n", name);
300                         if (result) {
301                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302                                          "%p obd_magic %08x != %08x\n", result,
303                                          result->obd_magic, OBD_DEVICE_MAGIC);
304                                 LASSERTF(result->obd_minor == new_obd_minor,
305                                          "%p obd_minor %d != %d\n", result,
306                                          result->obd_minor, new_obd_minor);
307
308                                 obd_devs[result->obd_minor] = NULL;
309                                 result->obd_name[0]='\0';
310                          }
311                         result = ERR_PTR(-EEXIST);
312                         break;
313                 }
314                 if (!result && !obd) {
315                         result = newdev;
316                         result->obd_minor = i;
317                         new_obd_minor = i;
318                         result->obd_type = type;
319                         strncpy(result->obd_name, name,
320                                 sizeof(result->obd_name) - 1);
321                         obd_devs[i] = result;
322                 }
323         }
324         spin_unlock(&obd_dev_lock);
325
326         if (result == NULL && i >= class_devno_max()) {
327                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328                        class_devno_max());
329                 result = ERR_PTR(-EOVERFLOW);
330         }
331
332         if (IS_ERR(result)) {
333                 obd_device_free(newdev);
334                 class_put_type(type);
335         } else {
336                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337                        result->obd_name, result);
338         }
339         return result;
340 }
341
342 void class_release_dev(struct obd_device *obd)
343 {
344         struct obd_type *obd_type = obd->obd_type;
345
346         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350         LASSERT(obd_type != NULL);
351
352         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353                obd->obd_name,obd->obd_type->typ_name);
354
355         spin_lock(&obd_dev_lock);
356         obd_devs[obd->obd_minor] = NULL;
357         spin_unlock(&obd_dev_lock);
358         obd_device_free(obd);
359
360         class_put_type(obd_type);
361 }
362
363 int class_name2dev(const char *name)
364 {
365         int i;
366
367         if (!name)
368                 return -1;
369
370         spin_lock(&obd_dev_lock);
371         for (i = 0; i < class_devno_max(); i++) {
372                 struct obd_device *obd = class_num2obd(i);
373                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374                         /* Make sure we finished attaching before we give
375                            out any references */
376                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377                         if (obd->obd_attached) {
378                                 spin_unlock(&obd_dev_lock);
379                                 return i;
380                         }
381                         break;
382                 }
383         }
384         spin_unlock(&obd_dev_lock);
385
386         return -1;
387 }
388
389 struct obd_device *class_name2obd(const char *name)
390 {
391         int dev = class_name2dev(name);
392
393         if (dev < 0 || dev > class_devno_max())
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 int class_uuid2dev(struct obd_uuid *uuid)
399 {
400         int i;
401
402         spin_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407                         spin_unlock(&obd_dev_lock);
408                         return i;
409                 }
410         }
411         spin_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
417 {
418         int dev = class_uuid2dev(uuid);
419         if (dev < 0)
420                 return NULL;
421         return class_num2obd(dev);
422 }
423
424 /**
425  * Get obd device from ::obd_devs[]
426  *
427  * \param num [in] array index
428  *
429  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430  *         otherwise return the obd device there.
431  */
432 struct obd_device *class_num2obd(int num)
433 {
434         struct obd_device *obd = NULL;
435
436         if (num < class_devno_max()) {
437                 obd = obd_devs[num];
438                 if (obd == NULL)
439                         return NULL;
440
441                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442                          "%p obd_magic %08x != %08x\n",
443                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444                 LASSERTF(obd->obd_minor == num,
445                          "%p obd_minor %0d != %0d\n",
446                          obd, obd->obd_minor, num);
447         }
448
449         return obd;
450 }
451
452 void class_obd_list(void)
453 {
454         char *status;
455         int i;
456
457         spin_lock(&obd_dev_lock);
458         for (i = 0; i < class_devno_max(); i++) {
459                 struct obd_device *obd = class_num2obd(i);
460                 if (obd == NULL)
461                         continue;
462                 if (obd->obd_stopping)
463                         status = "ST";
464                 else if (obd->obd_set_up)
465                         status = "UP";
466                 else if (obd->obd_attached)
467                         status = "AT";
468                 else
469                         status = "--";
470                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471                          i, status, obd->obd_type->typ_name,
472                          obd->obd_name, obd->obd_uuid.uuid,
473                          atomic_read(&obd->obd_refcount));
474         }
475         spin_unlock(&obd_dev_lock);
476         return;
477 }
478
479 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
480    specified, then only the client with that uuid is returned,
481    otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483                                           const char * typ_name,
484                                           struct obd_uuid *grp_uuid)
485 {
486         int i;
487
488         spin_lock(&obd_dev_lock);
489         for (i = 0; i < class_devno_max(); i++) {
490                 struct obd_device *obd = class_num2obd(i);
491                 if (obd == NULL)
492                         continue;
493                 if ((strncmp(obd->obd_type->typ_name, typ_name,
494                              strlen(typ_name)) == 0)) {
495                         if (obd_uuid_equals(tgt_uuid,
496                                             &obd->u.cli.cl_target_uuid) &&
497                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
498                                                          &obd->obd_uuid) : 1)) {
499                                 spin_unlock(&obd_dev_lock);
500                                 return obd;
501                         }
502                 }
503         }
504         spin_unlock(&obd_dev_lock);
505
506         return NULL;
507 }
508
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510                                             struct obd_uuid *grp_uuid)
511 {
512         struct obd_device *obd;
513
514         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
515         if (!obd)
516                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
517                                             grp_uuid);
518         return obd;
519 }
520
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522    searching at *next, and if a device is found, the next index to look
523    at is saved in *next. If next is NULL, then the first matching device
524    will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
526 {
527         int i;
528
529         if (next == NULL)
530                 i = 0;
531         else if (*next >= 0 && *next < class_devno_max())
532                 i = *next;
533         else
534                 return NULL;
535
536         spin_lock(&obd_dev_lock);
537         for (; i < class_devno_max(); i++) {
538                 struct obd_device *obd = class_num2obd(i);
539                 if (obd == NULL)
540                         continue;
541                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
542                         if (next != NULL)
543                                 *next = i+1;
544                         spin_unlock(&obd_dev_lock);
545                         return obd;
546                 }
547         }
548         spin_unlock(&obd_dev_lock);
549
550         return NULL;
551 }
552
553
554 void obd_cleanup_caches(void)
555 {
556         int rc;
557
558         ENTRY;
559         if (obd_device_cachep) {
560                 rc = cfs_mem_cache_destroy(obd_device_cachep);
561                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562                 obd_device_cachep = NULL;
563         }
564         if (obdo_cachep) {
565                 rc = cfs_mem_cache_destroy(obdo_cachep);
566                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
567                 obdo_cachep = NULL;
568         }
569         if (import_cachep) {
570                 rc = cfs_mem_cache_destroy(import_cachep);
571                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572                 import_cachep = NULL;
573         }
574         if (capa_cachep) {
575                 rc = cfs_mem_cache_destroy(capa_cachep);
576                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
577                 capa_cachep = NULL;
578         }
579         EXIT;
580 }
581
582 int obd_init_caches(void)
583 {
584         ENTRY;
585
586         LASSERT(obd_device_cachep == NULL);
587         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588                                                  sizeof(struct obd_device),
589                                                  0, 0);
590         if (!obd_device_cachep)
591                 GOTO(out, -ENOMEM);
592
593         LASSERT(obdo_cachep == NULL);
594         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
595                                            0, 0);
596         if (!obdo_cachep)
597                 GOTO(out, -ENOMEM);
598
599         LASSERT(import_cachep == NULL);
600         import_cachep = cfs_mem_cache_create("ll_import_cache",
601                                              sizeof(struct obd_import),
602                                              0, 0);
603         if (!import_cachep)
604                 GOTO(out, -ENOMEM);
605
606         LASSERT(capa_cachep == NULL);
607         capa_cachep = cfs_mem_cache_create("capa_cache",
608                                            sizeof(struct obd_capa), 0, 0);
609         if (!capa_cachep)
610                 GOTO(out, -ENOMEM);
611
612         RETURN(0);
613  out:
614         obd_cleanup_caches();
615         RETURN(-ENOMEM);
616
617 }
618
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
621 {
622         struct obd_export *export;
623         ENTRY;
624
625         if (!conn) {
626                 CDEBUG(D_CACHE, "looking for null handle\n");
627                 RETURN(NULL);
628         }
629
630         if (conn->cookie == -1) {  /* this means assign a new connection */
631                 CDEBUG(D_CACHE, "want a new connection\n");
632                 RETURN(NULL);
633         }
634
635         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636         export = class_handle2object(conn->cookie);
637         RETURN(export);
638 }
639
640 struct obd_device *class_exp2obd(struct obd_export *exp)
641 {
642         if (exp)
643                 return exp->exp_obd;
644         return NULL;
645 }
646
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
648 {
649         struct obd_export *export;
650         export = class_conn2export(conn);
651         if (export) {
652                 struct obd_device *obd = export->exp_obd;
653                 class_export_put(export);
654                 return obd;
655         }
656         return NULL;
657 }
658
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
660 {
661         struct obd_device *obd = exp->exp_obd;
662         if (obd == NULL)
663                 return NULL;
664         return obd->u.cli.cl_import;
665 }
666
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
668 {
669         struct obd_device *obd = class_conn2obd(conn);
670         if (obd == NULL)
671                 return NULL;
672         return obd->u.cli.cl_import;
673 }
674
675 /* Export management functions */
676 static void export_handle_addref(void *export)
677 {
678         class_export_get(export);
679 }
680
681 void __class_export_put(struct obd_export *exp)
682 {
683         if (atomic_dec_and_test(&exp->exp_refcount)) {
684                 LASSERT (list_empty(&exp->exp_obd_chain));
685
686                 CDEBUG(D_IOCTL, "final put %p/%s\n",
687                        exp, exp->exp_client_uuid.uuid);
688
689                 spin_lock(&obd_zombie_impexp_lock);
690                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
691                 spin_unlock(&obd_zombie_impexp_lock);
692
693                 if (obd_zombie_impexp_notify != NULL)
694                         obd_zombie_impexp_notify();
695         }
696 }
697 EXPORT_SYMBOL(__class_export_put);
698
699 void class_export_destroy(struct obd_export *exp)
700 {
701         struct obd_device *obd = exp->exp_obd;
702         ENTRY;
703
704         LASSERT (atomic_read(&exp->exp_refcount) == 0);
705
706         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
707                exp->exp_client_uuid.uuid, obd->obd_name);
708
709         LASSERT(obd != NULL);
710
711         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
712         if (exp->exp_connection)
713                 ptlrpc_put_connection_superhack(exp->exp_connection);
714
715         LASSERT(list_empty(&exp->exp_outstanding_replies));
716         LASSERT(list_empty(&exp->exp_req_replay_queue));
717         obd_destroy_export(exp);
718
719         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
720         class_decref(obd);
721         EXIT;
722 }
723
724 /* Creates a new export, adds it to the hash table, and returns a
725  * pointer to it. The refcount is 2: one for the hash reference, and
726  * one for the pointer returned by this function. */
727 struct obd_export *class_new_export(struct obd_device *obd,
728                                     struct obd_uuid *cluuid)
729 {
730         struct obd_export *export;
731         int rc = 0;
732
733         OBD_ALLOC_PTR(export);
734         if (!export)
735                 return ERR_PTR(-ENOMEM);
736
737         export->exp_conn_cnt = 0;
738         atomic_set(&export->exp_refcount, 2);
739         atomic_set(&export->exp_rpc_count, 0);
740         export->exp_obd = obd;
741         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
742         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
743         /* XXX this should be in LDLM init */
744         CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
745         spin_lock_init(&export->exp_ldlm_data.led_lock);
746
747         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
748         class_handle_hash(&export->exp_handle, export_handle_addref);
749         export->exp_last_request_time = cfs_time_current_sec();
750         spin_lock_init(&export->exp_lock);
751         INIT_HLIST_NODE(&export->exp_uuid_hash);
752         INIT_HLIST_NODE(&export->exp_nid_hash);
753
754         export->exp_sp_peer = LUSTRE_SP_ANY;
755         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
756         export->exp_client_uuid = *cluuid;
757         obd_init_export(export);
758
759         spin_lock(&obd->obd_dev_lock);
760         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
761                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
762                                             &export->exp_uuid_hash);
763                 if (rc != 0) {
764                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
765                                       obd->obd_name, cluuid->uuid, rc);
766                         spin_unlock(&obd->obd_dev_lock);
767                         class_handle_unhash(&export->exp_handle);
768                         OBD_FREE_PTR(export);
769                         return ERR_PTR(-EALREADY);
770                 }
771         }
772
773         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
774         class_incref(obd);
775         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
776         list_add_tail(&export->exp_obd_chain_timed,
777                       &export->exp_obd->obd_exports_timed);
778         export->exp_obd->obd_num_exports++;
779         spin_unlock(&obd->obd_dev_lock);
780
781         return export;
782 }
783 EXPORT_SYMBOL(class_new_export);
784
785 void class_unlink_export(struct obd_export *exp)
786 {
787         class_handle_unhash(&exp->exp_handle);
788
789         spin_lock(&exp->exp_obd->obd_dev_lock);
790         /* delete an uuid-export hashitem from hashtables */
791         if (!hlist_unhashed(&exp->exp_uuid_hash))
792                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
793                                 &exp->exp_client_uuid,
794                                 &exp->exp_uuid_hash);
795
796         list_del_init(&exp->exp_obd_chain);
797         list_del_init(&exp->exp_obd_chain_timed);
798         exp->exp_obd->obd_num_exports--;
799         spin_unlock(&exp->exp_obd->obd_dev_lock);
800
801         class_export_put(exp);
802 }
803 EXPORT_SYMBOL(class_unlink_export);
804
805 /* Import management functions */
806 static void import_handle_addref(void *import)
807 {
808         class_import_get(import);
809 }
810
811 struct obd_import *class_import_get(struct obd_import *import)
812 {
813         LASSERT(atomic_read(&import->imp_refcount) >= 0);
814         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
815         atomic_inc(&import->imp_refcount);
816         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
817                atomic_read(&import->imp_refcount));
818         return import;
819 }
820 EXPORT_SYMBOL(class_import_get);
821
822 void class_import_put(struct obd_import *import)
823 {
824         ENTRY;
825
826         LASSERT(atomic_read(&import->imp_refcount) > 0);
827         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
828         LASSERT(list_empty(&import->imp_zombie_chain));
829
830         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
831                atomic_read(&import->imp_refcount) - 1);
832
833         if (atomic_dec_and_test(&import->imp_refcount)) {
834
835                 CDEBUG(D_INFO, "final put import %p\n", import);
836
837                 spin_lock(&obd_zombie_impexp_lock);
838                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
839                 spin_unlock(&obd_zombie_impexp_lock);
840
841                 if (obd_zombie_impexp_notify != NULL)
842                         obd_zombie_impexp_notify();
843         }
844
845         EXIT;
846 }
847 EXPORT_SYMBOL(class_import_put);
848
849 void class_import_destroy(struct obd_import *import)
850 {
851         ENTRY;
852
853         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
854                 import->imp_obd->obd_name);
855
856         LASSERT(atomic_read(&import->imp_refcount) == 0);
857
858         ptlrpc_put_connection_superhack(import->imp_connection);
859
860         while (!list_empty(&import->imp_conn_list)) {
861                 struct obd_import_conn *imp_conn;
862
863                 imp_conn = list_entry(import->imp_conn_list.next,
864                                       struct obd_import_conn, oic_item);
865                 list_del(&imp_conn->oic_item);
866                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
867                 OBD_FREE(imp_conn, sizeof(*imp_conn));
868         }
869
870         LASSERT(import->imp_sec == NULL);
871         class_decref(import->imp_obd);
872         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
873         EXIT;
874 }
875
876 static void init_imp_at(struct imp_at *at) {
877         int i;
878         at_init(&at->iat_net_latency, 0, 0);
879         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
880                 /* max service estimates are tracked on the server side, so
881                    don't use the AT history here, just use the last reported
882                    val. (But keep hist for proc histogram, worst_ever) */
883                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
884                         AT_FLG_NOHIST);
885         }
886 }
887
888 struct obd_import *class_new_import(struct obd_device *obd)
889 {
890         struct obd_import *imp;
891
892         OBD_ALLOC(imp, sizeof(*imp));
893         if (imp == NULL)
894                 return NULL;
895
896         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
897         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
898         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
899         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
900         spin_lock_init(&imp->imp_lock);
901         imp->imp_last_success_conn = 0;
902         imp->imp_state = LUSTRE_IMP_NEW;
903         imp->imp_obd = class_incref(obd);
904         sema_init(&imp->imp_sec_mutex, 1);
905         cfs_waitq_init(&imp->imp_recovery_waitq);
906
907         atomic_set(&imp->imp_refcount, 2);
908         atomic_set(&imp->imp_inflight, 0);
909         atomic_set(&imp->imp_replay_inflight, 0);
910         atomic_set(&imp->imp_inval_count, 0);
911         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
912         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
913         class_handle_hash(&imp->imp_handle, import_handle_addref);
914         init_imp_at(&imp->imp_at);
915
916         /* the default magic is V2, will be used in connect RPC, and
917          * then adjusted according to the flags in request/reply. */
918         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
919
920         return imp;
921 }
922 EXPORT_SYMBOL(class_new_import);
923
924 void class_destroy_import(struct obd_import *import)
925 {
926         LASSERT(import != NULL);
927         LASSERT(import != LP_POISON);
928
929         class_handle_unhash(&import->imp_handle);
930
931         spin_lock(&import->imp_lock);
932         import->imp_generation++;
933         spin_unlock(&import->imp_lock);
934         class_import_put(import);
935 }
936 EXPORT_SYMBOL(class_destroy_import);
937
938 /* A connection defines an export context in which preallocation can
939    be managed. This releases the export pointer reference, and returns
940    the export handle, so the export refcount is 1 when this function
941    returns. */
942 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
943                   struct obd_uuid *cluuid)
944 {
945         struct obd_export *export;
946         LASSERT(conn != NULL);
947         LASSERT(obd != NULL);
948         LASSERT(cluuid != NULL);
949         ENTRY;
950
951         export = class_new_export(obd, cluuid);
952         if (IS_ERR(export))
953                 RETURN(PTR_ERR(export));
954
955         conn->cookie = export->exp_handle.h_cookie;
956         class_export_put(export);
957
958         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
959                cluuid->uuid, conn->cookie);
960         RETURN(0);
961 }
962 EXPORT_SYMBOL(class_connect);
963
964 /* if export is involved in recovery then clean up related things */
965 void class_export_recovery_cleanup(struct obd_export *exp)
966 {
967         struct obd_device *obd = exp->exp_obd;
968
969         spin_lock_bh(&obd->obd_processing_task_lock);
970         if (obd->obd_recovering && exp->exp_in_recovery) {
971                 spin_lock(&exp->exp_lock);
972                 exp->exp_in_recovery = 0;
973                 spin_unlock(&exp->exp_lock);
974                 obd->obd_connected_clients--;
975                 /* each connected client is counted as recoverable */
976                 obd->obd_recoverable_clients--;
977                 if (exp->exp_req_replay_needed) {
978                         spin_lock(&exp->exp_lock);
979                         exp->exp_req_replay_needed = 0;
980                         spin_unlock(&exp->exp_lock);
981                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
982                         atomic_dec(&obd->obd_req_replay_clients);
983                 }
984                 if (exp->exp_lock_replay_needed) {
985                         spin_lock(&exp->exp_lock);
986                         exp->exp_lock_replay_needed = 0;
987                         spin_unlock(&exp->exp_lock);
988                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
989                         atomic_dec(&obd->obd_lock_replay_clients);
990                 }
991         }
992         spin_unlock_bh(&obd->obd_processing_task_lock);
993 }
994
995 /* This function removes two references from the export: one for the
996  * hash entry and one for the export pointer passed in.  The export
997  * pointer passed to this function is destroyed should not be used
998  * again. */
999 int class_disconnect(struct obd_export *export)
1000 {
1001         int already_disconnected;
1002         ENTRY;
1003
1004         if (export == NULL) {
1005                 fixme();
1006                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1007                 RETURN(-EINVAL);
1008         }
1009
1010         spin_lock(&export->exp_lock);
1011         already_disconnected = export->exp_disconnected;
1012         export->exp_disconnected = 1;
1013
1014         if (!hlist_unhashed(&export->exp_nid_hash))
1015                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1016                                 &export->exp_connection->c_peer.nid,
1017                                 &export->exp_nid_hash);
1018
1019         spin_unlock(&export->exp_lock);
1020
1021         /* class_cleanup(), abort_recovery(), and class_fail_export()
1022          * all end up in here, and if any of them race we shouldn't
1023          * call extra class_export_puts(). */
1024         if (already_disconnected)
1025                 RETURN(0);
1026
1027         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1028                export->exp_handle.h_cookie);
1029
1030         class_export_recovery_cleanup(export);
1031         class_unlink_export(export);
1032         class_export_put(export);
1033         RETURN(0);
1034 }
1035
1036 static void class_disconnect_export_list(struct list_head *list, int flags)
1037 {
1038         int rc;
1039         struct lustre_handle fake_conn;
1040         struct obd_export *fake_exp, *exp;
1041         ENTRY;
1042
1043         /* It's possible that an export may disconnect itself, but
1044          * nothing else will be added to this list. */
1045         while (!list_empty(list)) {
1046                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1047                 class_export_get(exp);
1048
1049                 spin_lock(&exp->exp_lock);
1050                 exp->exp_flags = flags;
1051                 spin_unlock(&exp->exp_lock);
1052
1053                 if (obd_uuid_equals(&exp->exp_client_uuid,
1054                                     &exp->exp_obd->obd_uuid)) {
1055                         CDEBUG(D_HA,
1056                                "exp %p export uuid == obd uuid, don't discon\n",
1057                                exp);
1058                         /* Need to delete this now so we don't end up pointing
1059                          * to work_list later when this export is cleaned up. */
1060                         list_del_init(&exp->exp_obd_chain);
1061                         class_export_put(exp);
1062                         continue;
1063                 }
1064
1065                 fake_conn.cookie = exp->exp_handle.h_cookie;
1066                 fake_exp = class_conn2export(&fake_conn);
1067                 if (!fake_exp) {
1068                         class_export_put(exp);
1069                         continue;
1070                 }
1071
1072                 spin_lock(&fake_exp->exp_lock);
1073                 fake_exp->exp_flags = flags;
1074                 spin_unlock(&fake_exp->exp_lock);
1075
1076                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1077                        "last request at "CFS_TIME_T"\n",
1078                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1079                        exp, exp->exp_last_request_time);
1080                 rc = obd_disconnect(fake_exp);
1081                 class_export_put(exp);
1082         }
1083         EXIT;
1084 }
1085
1086 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1087 {
1088         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1089                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1090 }
1091
1092 void class_disconnect_exports(struct obd_device *obd)
1093 {
1094         struct list_head work_list;
1095         ENTRY;
1096
1097         /* Move all of the exports from obd_exports to a work list, en masse. */
1098         spin_lock(&obd->obd_dev_lock);
1099         list_add(&work_list, &obd->obd_exports);
1100         list_del_init(&obd->obd_exports);
1101         spin_unlock(&obd->obd_dev_lock);
1102
1103         if (!list_empty(&work_list)) {
1104                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1105                        "disconnecting them\n", obd->obd_minor, obd);
1106                 class_disconnect_export_list(&work_list,
1107                                              get_exp_flags_from_obd(obd));
1108         } else
1109                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1110                        obd->obd_minor, obd);
1111         EXIT;
1112 }
1113 EXPORT_SYMBOL(class_disconnect_exports);
1114
1115 /* Remove exports that have not completed recovery.
1116  */
1117 int class_disconnect_stale_exports(struct obd_device *obd,
1118                                    int (*test_export)(struct obd_export *))
1119 {
1120         struct list_head work_list;
1121         struct list_head *pos, *n;
1122         struct obd_export *exp;
1123         int cnt = 0;
1124         ENTRY;
1125
1126         CFS_INIT_LIST_HEAD(&work_list);
1127         spin_lock(&obd->obd_dev_lock);
1128         list_for_each_safe(pos, n, &obd->obd_exports) {
1129                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1130                 if (test_export(exp))
1131                         continue;
1132
1133                 list_del(&exp->exp_obd_chain);
1134                 list_add(&exp->exp_obd_chain, &work_list);
1135                 /* don't count self-export as client */
1136                 if (obd_uuid_equals(&exp->exp_client_uuid,
1137                                      &exp->exp_obd->obd_uuid))
1138                         continue;
1139
1140                 cnt++;
1141                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1142                        obd->obd_name, exp->exp_client_uuid.uuid,
1143                        exp->exp_connection == NULL ? "<unknown>" :
1144                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1145         }
1146         spin_unlock(&obd->obd_dev_lock);
1147
1148         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1149                obd->obd_name, cnt);
1150         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1151         RETURN(cnt);
1152 }
1153 EXPORT_SYMBOL(class_disconnect_stale_exports);
1154
1155 int oig_init(struct obd_io_group **oig_out)
1156 {
1157         struct obd_io_group *oig;
1158         ENTRY;
1159
1160         OBD_ALLOC(oig, sizeof(*oig));
1161         if (oig == NULL)
1162                 RETURN(-ENOMEM);
1163
1164         spin_lock_init(&oig->oig_lock);
1165         oig->oig_rc = 0;
1166         oig->oig_pending = 0;
1167         atomic_set(&oig->oig_refcount, 1);
1168         cfs_waitq_init(&oig->oig_waitq);
1169         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1170
1171         *oig_out = oig;
1172         RETURN(0);
1173 };
1174 EXPORT_SYMBOL(oig_init);
1175
1176 static inline void oig_grab(struct obd_io_group *oig)
1177 {
1178         atomic_inc(&oig->oig_refcount);
1179 }
1180
1181 void oig_release(struct obd_io_group *oig)
1182 {
1183         if (atomic_dec_and_test(&oig->oig_refcount))
1184                 OBD_FREE(oig, sizeof(*oig));
1185 }
1186 EXPORT_SYMBOL(oig_release);
1187
1188 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1189 {
1190         int rc = 0;
1191         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1192         spin_lock(&oig->oig_lock);
1193         if (oig->oig_rc) {
1194                 rc = oig->oig_rc;
1195         } else {
1196                 oig->oig_pending++;
1197                 if (occ != NULL)
1198                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1199         }
1200         spin_unlock(&oig->oig_lock);
1201         oig_grab(oig);
1202
1203         return rc;
1204 }
1205 EXPORT_SYMBOL(oig_add_one);
1206
1207 void oig_complete_one(struct obd_io_group *oig,
1208                       struct oig_callback_context *occ, int rc)
1209 {
1210         cfs_waitq_t *wake = NULL;
1211         int old_rc;
1212
1213         spin_lock(&oig->oig_lock);
1214
1215         if (occ != NULL)
1216                 list_del_init(&occ->occ_oig_item);
1217
1218         old_rc = oig->oig_rc;
1219         if (oig->oig_rc == 0 && rc != 0)
1220                 oig->oig_rc = rc;
1221
1222         if (--oig->oig_pending <= 0)
1223                 wake = &oig->oig_waitq;
1224
1225         spin_unlock(&oig->oig_lock);
1226
1227         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1228                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1229                         oig->oig_pending);
1230         if (wake)
1231                 cfs_waitq_signal(wake);
1232         oig_release(oig);
1233 }
1234 EXPORT_SYMBOL(oig_complete_one);
1235
1236 static int oig_done(struct obd_io_group *oig)
1237 {
1238         int rc = 0;
1239         spin_lock(&oig->oig_lock);
1240         if (oig->oig_pending <= 0)
1241                 rc = 1;
1242         spin_unlock(&oig->oig_lock);
1243         return rc;
1244 }
1245
1246 static void interrupted_oig(void *data)
1247 {
1248         struct obd_io_group *oig = data;
1249         struct oig_callback_context *occ;
1250
1251         spin_lock(&oig->oig_lock);
1252         /* We need to restart the processing each time we drop the lock, as
1253          * it is possible other threads called oig_complete_one() to remove
1254          * an entry elsewhere in the list while we dropped lock.  We need to
1255          * drop the lock because osc_ap_completion() calls oig_complete_one()
1256          * which re-gets this lock ;-) as well as a lock ordering issue. */
1257 restart:
1258         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1259                 if (occ->interrupted)
1260                         continue;
1261                 occ->interrupted = 1;
1262                 spin_unlock(&oig->oig_lock);
1263                 occ->occ_interrupted(occ);
1264                 spin_lock(&oig->oig_lock);
1265                 goto restart;
1266         }
1267         spin_unlock(&oig->oig_lock);
1268 }
1269
1270 int oig_wait(struct obd_io_group *oig)
1271 {
1272         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1273         int rc;
1274
1275         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1276
1277         do {
1278                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1279                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1280                 /* we can't continue until the oig has emptied and stopped
1281                  * referencing state that the caller will free upon return */
1282                 if (rc == -EINTR)
1283                         lwi = (struct l_wait_info){ 0, };
1284         } while (rc == -EINTR);
1285
1286         LASSERTF(oig->oig_pending == 0,
1287                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1288                  oig->oig_pending);
1289
1290         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1291         return oig->oig_rc;
1292 }
1293 EXPORT_SYMBOL(oig_wait);
1294
1295 void class_fail_export(struct obd_export *exp)
1296 {
1297         int rc, already_failed;
1298
1299         spin_lock(&exp->exp_lock);
1300         already_failed = exp->exp_failed;
1301         exp->exp_failed = 1;
1302         spin_unlock(&exp->exp_lock);
1303
1304         if (already_failed) {
1305                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1306                        exp, exp->exp_client_uuid.uuid);
1307                 return;
1308         }
1309
1310         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1311                exp, exp->exp_client_uuid.uuid);
1312
1313         if (obd_dump_on_timeout)
1314                 libcfs_debug_dumplog();
1315
1316         /* Most callers into obd_disconnect are removing their own reference
1317          * (request, for example) in addition to the one from the hash table.
1318          * We don't have such a reference here, so make one. */
1319         class_export_get(exp);
1320         rc = obd_disconnect(exp);
1321         if (rc)
1322                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1323         else
1324                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1325                        exp, exp->exp_client_uuid.uuid);
1326 }
1327 EXPORT_SYMBOL(class_fail_export);
1328
1329 char *obd_export_nid2str(struct obd_export *exp)
1330 {
1331         if (exp->exp_connection != NULL)
1332                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1333
1334         return "(no nid)";
1335 }
1336 EXPORT_SYMBOL(obd_export_nid2str);
1337
1338 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1339 {
1340         struct obd_export *doomed_exp = NULL;
1341         int exports_evicted = 0;
1342
1343         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1344
1345         do {
1346                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1347                 if (doomed_exp == NULL)
1348                         break;
1349
1350                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1351                          "nid %s found, wanted nid %s, requested nid %s\n",
1352                          obd_export_nid2str(doomed_exp),
1353                          libcfs_nid2str(nid_key), nid);
1354                 LASSERTF(doomed_exp != obd->obd_self_export,
1355                          "self-export is hashed by NID?\n");
1356                 exports_evicted++;
1357                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1358                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1359                        exports_evicted);
1360                 class_fail_export(doomed_exp);
1361                 class_export_put(doomed_exp);
1362         } while (1);
1363
1364         if (!exports_evicted)
1365                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1366                        obd->obd_name, nid);
1367         return exports_evicted;
1368 }
1369 EXPORT_SYMBOL(obd_export_evict_by_nid);
1370
1371 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1372 {
1373         struct obd_export *doomed_exp = NULL;
1374         struct obd_uuid doomed_uuid;
1375         int exports_evicted = 0;
1376
1377         obd_str2uuid(&doomed_uuid, uuid);
1378         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1379                 CERROR("%s: can't evict myself\n", obd->obd_name);
1380                 return exports_evicted;
1381         }
1382
1383         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1384
1385         if (doomed_exp == NULL) {
1386                 CERROR("%s: can't disconnect %s: no exports found\n",
1387                        obd->obd_name, uuid);
1388         } else {
1389                 CWARN("%s: evicting %s at adminstrative request\n",
1390                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1391                 class_fail_export(doomed_exp);
1392                 class_export_put(doomed_exp);
1393                 exports_evicted++;
1394         }
1395
1396         return exports_evicted;
1397 }
1398 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1399
1400 /**
1401  * kill zombie imports and exports
1402  */
1403 void obd_zombie_impexp_cull(void)
1404 {
1405         struct obd_import *import;
1406         struct obd_export *export;
1407         ENTRY;
1408
1409         do {
1410                 spin_lock (&obd_zombie_impexp_lock);
1411
1412                 import = NULL;
1413                 if (!list_empty(&obd_zombie_imports)) {
1414                         import = list_entry(obd_zombie_imports.next,
1415                                             struct obd_import,
1416                                             imp_zombie_chain);
1417                         list_del(&import->imp_zombie_chain);
1418                 }
1419
1420                 export = NULL;
1421                 if (!list_empty(&obd_zombie_exports)) {
1422                         export = list_entry(obd_zombie_exports.next,
1423                                             struct obd_export,
1424                                             exp_obd_chain);
1425                         list_del_init(&export->exp_obd_chain);
1426                 }
1427
1428                 spin_unlock(&obd_zombie_impexp_lock);
1429
1430                 if (import != NULL)
1431                         class_import_destroy(import);
1432
1433                 if (export != NULL)
1434                         class_export_destroy(export);
1435
1436         } while (import != NULL || export != NULL);
1437         EXIT;
1438 }
1439
1440 static struct completion        obd_zombie_start;
1441 static struct completion        obd_zombie_stop;
1442 static unsigned long            obd_zombie_flags;
1443 static cfs_waitq_t              obd_zombie_waitq;
1444
1445 enum {
1446         OBD_ZOMBIE_STOP = 1
1447 };
1448
1449 /**
1450  * check for work for kill zombie import/export thread.
1451  */
1452 int obd_zombie_impexp_check(void *arg)
1453 {
1454         int rc;
1455
1456         spin_lock(&obd_zombie_impexp_lock);
1457         rc = list_empty(&obd_zombie_imports) &&
1458              list_empty(&obd_zombie_exports) &&
1459              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1460
1461         spin_unlock(&obd_zombie_impexp_lock);
1462
1463         RETURN(rc);
1464 }
1465
1466 /**
1467  * notify import/export destroy thread about new zombie.
1468  */
1469 static void obd_zombie_impexp_notify(void)
1470 {
1471         cfs_waitq_signal(&obd_zombie_waitq);
1472 }
1473
1474 #ifdef __KERNEL__
1475
1476 /**
1477  * destroy zombie export/import thread.
1478  */
1479 static int obd_zombie_impexp_thread(void *unused)
1480 {
1481         int rc;
1482
1483         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1484                 complete(&obd_zombie_start);
1485                 RETURN(rc);
1486         }
1487
1488         complete(&obd_zombie_start);
1489
1490         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1491                 struct l_wait_info lwi = { 0 };
1492
1493                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1494
1495                 obd_zombie_impexp_cull();
1496         }
1497
1498         complete(&obd_zombie_stop);
1499
1500         RETURN(0);
1501 }
1502
1503 #else /* ! KERNEL */
1504
1505 static atomic_t zombie_recur = ATOMIC_INIT(0);
1506 static void *obd_zombie_impexp_work_cb;
1507 static void *obd_zombie_impexp_idle_cb;
1508
1509 int obd_zombie_impexp_kill(void *arg)
1510 {
1511         int rc = 0;
1512
1513         if (atomic_inc_return(&zombie_recur) == 1) {
1514                 obd_zombie_impexp_cull();
1515                 rc = 1;
1516         }
1517         atomic_dec(&zombie_recur);
1518         return rc;
1519 }
1520
1521 #endif
1522
1523 /**
1524  * start destroy zombie import/export thread
1525  */
1526 int obd_zombie_impexp_init(void)
1527 {
1528         int rc;
1529
1530         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1531         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1532         spin_lock_init(&obd_zombie_impexp_lock);
1533         init_completion(&obd_zombie_start);
1534         init_completion(&obd_zombie_stop);
1535         cfs_waitq_init(&obd_zombie_waitq);
1536
1537 #ifdef __KERNEL__
1538         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1539         if (rc < 0)
1540                 RETURN(rc);
1541
1542         wait_for_completion(&obd_zombie_start);
1543 #else
1544
1545         obd_zombie_impexp_work_cb =
1546                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1547                                                  &obd_zombie_impexp_kill, NULL);
1548
1549         obd_zombie_impexp_idle_cb =
1550                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1551                                                  &obd_zombie_impexp_check, NULL);
1552         rc = 0;
1553
1554 #endif
1555         RETURN(rc);
1556 }
1557 /**
1558  * stop destroy zombie import/export thread
1559  */
1560 void obd_zombie_impexp_stop(void)
1561 {
1562         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1563         obd_zombie_impexp_notify();
1564 #ifdef __KERNEL__
1565         wait_for_completion(&obd_zombie_stop);
1566 #else
1567         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1568         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1569 #endif
1570 }