Whamcloud - gitweb
b=16777
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94 EXPORT_SYMBOL(obd_device_free);
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150                         struct lprocfs_vars *vars, const char *name,
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173
174         if (type->typ_dt_ops == NULL ||
175             type->typ_md_ops == NULL ||
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = ldt->ldt_ops->ldto_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 /**
258  * Create a new obd device.
259  *
260  * Find an empty slot in ::obd_devs[], create a new obd device in it.
261  *
262  * \param typename [in] obd device type string.
263  * \param name     [in] obd device name.
264  *
265  * \retval NULL if create fails, otherwise return the obd device
266  *         pointer created.
267  */
268 struct obd_device *class_newdev(const char *type_name, const char *name)
269 {
270         struct obd_device *result = NULL;
271         struct obd_device *newdev;
272         struct obd_type *type = NULL;
273         int i;
274         int new_obd_minor = 0;
275
276         if (strlen(name) >= MAX_OBD_NAME) {
277                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278                 RETURN(ERR_PTR(-EINVAL));
279         }
280
281         type = class_get_type(type_name);
282         if (type == NULL){
283                 CERROR("OBD: unknown type: %s\n", type_name);
284                 RETURN(ERR_PTR(-ENODEV));
285         }
286
287         newdev = obd_device_alloc();
288         if (newdev == NULL) {
289                 class_put_type(type);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
293
294         spin_lock(&obd_dev_lock);
295         for (i = 0; i < class_devno_max(); i++) {
296                 struct obd_device *obd = class_num2obd(i);
297                 if (obd && obd->obd_name &&
298                     (strcmp(name, obd->obd_name) == 0)) {
299                         CERROR("Device %s already exists, won't add\n", name);
300                         if (result) {
301                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302                                          "%p obd_magic %08x != %08x\n", result,
303                                          result->obd_magic, OBD_DEVICE_MAGIC);
304                                 LASSERTF(result->obd_minor == new_obd_minor,
305                                          "%p obd_minor %d != %d\n", result,
306                                          result->obd_minor, new_obd_minor);
307
308                                 obd_devs[result->obd_minor] = NULL;
309                                 result->obd_name[0]='\0';
310                          }
311                         result = ERR_PTR(-EEXIST);
312                         break;
313                 }
314                 if (!result && !obd) {
315                         result = newdev;
316                         result->obd_minor = i;
317                         new_obd_minor = i;
318                         result->obd_type = type;
319                         strncpy(result->obd_name, name,
320                                 sizeof(result->obd_name) - 1);
321                         obd_devs[i] = result;
322                 }
323         }
324         spin_unlock(&obd_dev_lock);
325
326         if (result == NULL && i >= class_devno_max()) {
327                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328                        class_devno_max());
329                 result = ERR_PTR(-EOVERFLOW);
330         }
331
332         if (IS_ERR(result)) {
333                 obd_device_free(newdev);
334                 class_put_type(type);
335         } else {
336                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337                        result->obd_name, result);
338         }
339         return result;
340 }
341
342 void class_release_dev(struct obd_device *obd)
343 {
344         struct obd_type *obd_type = obd->obd_type;
345
346         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350         LASSERT(obd_type != NULL);
351
352         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353                obd->obd_name,obd->obd_type->typ_name);
354
355         spin_lock(&obd_dev_lock);
356         obd_devs[obd->obd_minor] = NULL;
357         spin_unlock(&obd_dev_lock);
358         obd_device_free(obd);
359
360         class_put_type(obd_type);
361 }
362
363 int class_name2dev(const char *name)
364 {
365         int i;
366
367         if (!name)
368                 return -1;
369
370         spin_lock(&obd_dev_lock);
371         for (i = 0; i < class_devno_max(); i++) {
372                 struct obd_device *obd = class_num2obd(i);
373                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374                         /* Make sure we finished attaching before we give
375                            out any references */
376                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377                         if (obd->obd_attached) {
378                                 spin_unlock(&obd_dev_lock);
379                                 return i;
380                         }
381                         break;
382                 }
383         }
384         spin_unlock(&obd_dev_lock);
385
386         return -1;
387 }
388
389 struct obd_device *class_name2obd(const char *name)
390 {
391         int dev = class_name2dev(name);
392
393         if (dev < 0 || dev > class_devno_max())
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 int class_uuid2dev(struct obd_uuid *uuid)
399 {
400         int i;
401
402         spin_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407                         spin_unlock(&obd_dev_lock);
408                         return i;
409                 }
410         }
411         spin_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
417 {
418         int dev = class_uuid2dev(uuid);
419         if (dev < 0)
420                 return NULL;
421         return class_num2obd(dev);
422 }
423
424 /**
425  * Get obd device from ::obd_devs[]
426  *
427  * \param num [in] array index
428  *
429  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430  *         otherwise return the obd device there.
431  */
432 struct obd_device *class_num2obd(int num)
433 {
434         struct obd_device *obd = NULL;
435
436         if (num < class_devno_max()) {
437                 obd = obd_devs[num];
438                 if (obd == NULL)
439                         return NULL;
440
441                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442                          "%p obd_magic %08x != %08x\n",
443                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444                 LASSERTF(obd->obd_minor == num,
445                          "%p obd_minor %0d != %0d\n",
446                          obd, obd->obd_minor, num);
447         }
448
449         return obd;
450 }
451
452 void class_obd_list(void)
453 {
454         char *status;
455         int i;
456
457         spin_lock(&obd_dev_lock);
458         for (i = 0; i < class_devno_max(); i++) {
459                 struct obd_device *obd = class_num2obd(i);
460                 if (obd == NULL)
461                         continue;
462                 if (obd->obd_stopping)
463                         status = "ST";
464                 else if (obd->obd_set_up)
465                         status = "UP";
466                 else if (obd->obd_attached)
467                         status = "AT";
468                 else
469                         status = "--";
470                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471                          i, status, obd->obd_type->typ_name,
472                          obd->obd_name, obd->obd_uuid.uuid,
473                          atomic_read(&obd->obd_refcount));
474         }
475         spin_unlock(&obd_dev_lock);
476         return;
477 }
478
479 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
480    specified, then only the client with that uuid is returned,
481    otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483                                           const char * typ_name,
484                                           struct obd_uuid *grp_uuid)
485 {
486         int i;
487
488         spin_lock(&obd_dev_lock);
489         for (i = 0; i < class_devno_max(); i++) {
490                 struct obd_device *obd = class_num2obd(i);
491                 if (obd == NULL)
492                         continue;
493                 if ((strncmp(obd->obd_type->typ_name, typ_name,
494                              strlen(typ_name)) == 0)) {
495                         if (obd_uuid_equals(tgt_uuid,
496                                             &obd->u.cli.cl_target_uuid) &&
497                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
498                                                          &obd->obd_uuid) : 1)) {
499                                 spin_unlock(&obd_dev_lock);
500                                 return obd;
501                         }
502                 }
503         }
504         spin_unlock(&obd_dev_lock);
505
506         return NULL;
507 }
508
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510                                             struct obd_uuid *grp_uuid)
511 {
512         struct obd_device *obd;
513
514         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
515         if (!obd)
516                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
517                                             grp_uuid);
518         return obd;
519 }
520
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522    searching at *next, and if a device is found, the next index to look
523    at is saved in *next. If next is NULL, then the first matching device
524    will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
526 {
527         int i;
528
529         if (next == NULL)
530                 i = 0;
531         else if (*next >= 0 && *next < class_devno_max())
532                 i = *next;
533         else
534                 return NULL;
535
536         spin_lock(&obd_dev_lock);
537         for (; i < class_devno_max(); i++) {
538                 struct obd_device *obd = class_num2obd(i);
539                 if (obd == NULL)
540                         continue;
541                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
542                         if (next != NULL)
543                                 *next = i+1;
544                         spin_unlock(&obd_dev_lock);
545                         return obd;
546                 }
547         }
548         spin_unlock(&obd_dev_lock);
549
550         return NULL;
551 }
552
553
554 void obd_cleanup_caches(void)
555 {
556         int rc;
557
558         ENTRY;
559         if (obd_device_cachep) {
560                 rc = cfs_mem_cache_destroy(obd_device_cachep);
561                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562                 obd_device_cachep = NULL;
563         }
564         if (obdo_cachep) {
565                 rc = cfs_mem_cache_destroy(obdo_cachep);
566                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
567                 obdo_cachep = NULL;
568         }
569         if (import_cachep) {
570                 rc = cfs_mem_cache_destroy(import_cachep);
571                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572                 import_cachep = NULL;
573         }
574         if (capa_cachep) {
575                 rc = cfs_mem_cache_destroy(capa_cachep);
576                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
577                 capa_cachep = NULL;
578         }
579         EXIT;
580 }
581
582 int obd_init_caches(void)
583 {
584         ENTRY;
585
586         LASSERT(obd_device_cachep == NULL);
587         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588                                                  sizeof(struct obd_device),
589                                                  0, 0);
590         if (!obd_device_cachep)
591                 GOTO(out, -ENOMEM);
592
593         LASSERT(obdo_cachep == NULL);
594         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
595                                            0, 0);
596         if (!obdo_cachep)
597                 GOTO(out, -ENOMEM);
598
599         LASSERT(import_cachep == NULL);
600         import_cachep = cfs_mem_cache_create("ll_import_cache",
601                                              sizeof(struct obd_import),
602                                              0, 0);
603         if (!import_cachep)
604                 GOTO(out, -ENOMEM);
605
606         LASSERT(capa_cachep == NULL);
607         capa_cachep = cfs_mem_cache_create("capa_cache",
608                                            sizeof(struct obd_capa), 0, 0);
609         if (!capa_cachep)
610                 GOTO(out, -ENOMEM);
611
612         RETURN(0);
613  out:
614         obd_cleanup_caches();
615         RETURN(-ENOMEM);
616
617 }
618
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
621 {
622         struct obd_export *export;
623         ENTRY;
624
625         if (!conn) {
626                 CDEBUG(D_CACHE, "looking for null handle\n");
627                 RETURN(NULL);
628         }
629
630         if (conn->cookie == -1) {  /* this means assign a new connection */
631                 CDEBUG(D_CACHE, "want a new connection\n");
632                 RETURN(NULL);
633         }
634
635         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636         export = class_handle2object(conn->cookie);
637         RETURN(export);
638 }
639
640 struct obd_device *class_exp2obd(struct obd_export *exp)
641 {
642         if (exp)
643                 return exp->exp_obd;
644         return NULL;
645 }
646
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
648 {
649         struct obd_export *export;
650         export = class_conn2export(conn);
651         if (export) {
652                 struct obd_device *obd = export->exp_obd;
653                 class_export_put(export);
654                 return obd;
655         }
656         return NULL;
657 }
658
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
660 {
661         struct obd_device *obd = exp->exp_obd;
662         if (obd == NULL)
663                 return NULL;
664         return obd->u.cli.cl_import;
665 }
666
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
668 {
669         struct obd_device *obd = class_conn2obd(conn);
670         if (obd == NULL)
671                 return NULL;
672         return obd->u.cli.cl_import;
673 }
674
675 /* Export management functions */
676 static void export_handle_addref(void *export)
677 {
678         class_export_get(export);
679 }
680
681 void __class_export_put(struct obd_export *exp)
682 {
683         if (atomic_dec_and_test(&exp->exp_refcount)) {
684                 LASSERT (list_empty(&exp->exp_obd_chain));
685
686                 CDEBUG(D_IOCTL, "final put %p/%s\n",
687                        exp, exp->exp_client_uuid.uuid);
688
689                 spin_lock(&obd_zombie_impexp_lock);
690                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
691                 spin_unlock(&obd_zombie_impexp_lock);
692
693                 if (obd_zombie_impexp_notify != NULL)
694                         obd_zombie_impexp_notify();
695         }
696 }
697 EXPORT_SYMBOL(__class_export_put);
698
699 void class_export_destroy(struct obd_export *exp)
700 {
701         struct obd_device *obd = exp->exp_obd;
702         ENTRY;
703
704         LASSERT (atomic_read(&exp->exp_refcount) == 0);
705
706         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
707                exp->exp_client_uuid.uuid, obd->obd_name);
708
709         LASSERT(obd != NULL);
710
711         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
712         if (exp->exp_connection)
713                 ptlrpc_put_connection_superhack(exp->exp_connection);
714
715         LASSERT(list_empty(&exp->exp_outstanding_replies));
716         LASSERT(list_empty(&exp->exp_req_replay_queue));
717         obd_destroy_export(exp);
718
719         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
720         class_decref(obd);
721         EXIT;
722 }
723
724 /* Creates a new export, adds it to the hash table, and returns a
725  * pointer to it. The refcount is 2: one for the hash reference, and
726  * one for the pointer returned by this function. */
727 struct obd_export *class_new_export(struct obd_device *obd,
728                                     struct obd_uuid *cluuid)
729 {
730         struct obd_export *export;
731         int rc = 0;
732
733         OBD_ALLOC_PTR(export);
734         if (!export)
735                 return ERR_PTR(-ENOMEM);
736
737         export->exp_conn_cnt = 0;
738         export->exp_lock_hash = NULL;
739         atomic_set(&export->exp_refcount, 2);
740         atomic_set(&export->exp_rpc_count, 0);
741         export->exp_obd = obd;
742         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
743         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
744         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
745         class_handle_hash(&export->exp_handle, export_handle_addref);
746         export->exp_last_request_time = cfs_time_current_sec();
747         spin_lock_init(&export->exp_lock);
748         INIT_HLIST_NODE(&export->exp_uuid_hash);
749         INIT_HLIST_NODE(&export->exp_nid_hash);
750
751         export->exp_sp_peer = LUSTRE_SP_ANY;
752         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
753         export->exp_client_uuid = *cluuid;
754         obd_init_export(export);
755
756         spin_lock(&obd->obd_dev_lock);
757         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
758                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
759                                             &export->exp_uuid_hash);
760                 if (rc != 0) {
761                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
762                                       obd->obd_name, cluuid->uuid, rc);
763                         spin_unlock(&obd->obd_dev_lock);
764                         class_handle_unhash(&export->exp_handle);
765                         OBD_FREE_PTR(export);
766                         return ERR_PTR(-EALREADY);
767                 }
768         }
769
770         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
771         class_incref(obd);
772         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
773         list_add_tail(&export->exp_obd_chain_timed,
774                       &export->exp_obd->obd_exports_timed);
775         export->exp_obd->obd_num_exports++;
776         spin_unlock(&obd->obd_dev_lock);
777
778         return export;
779 }
780 EXPORT_SYMBOL(class_new_export);
781
782 void class_unlink_export(struct obd_export *exp)
783 {
784         class_handle_unhash(&exp->exp_handle);
785
786         spin_lock(&exp->exp_obd->obd_dev_lock);
787         /* delete an uuid-export hashitem from hashtables */
788         if (!hlist_unhashed(&exp->exp_uuid_hash))
789                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
790                                 &exp->exp_client_uuid,
791                                 &exp->exp_uuid_hash);
792
793         list_del_init(&exp->exp_obd_chain);
794         list_del_init(&exp->exp_obd_chain_timed);
795         exp->exp_obd->obd_num_exports--;
796         spin_unlock(&exp->exp_obd->obd_dev_lock);
797
798         class_export_put(exp);
799 }
800 EXPORT_SYMBOL(class_unlink_export);
801
802 /* Import management functions */
803 static void import_handle_addref(void *import)
804 {
805         class_import_get(import);
806 }
807
808 struct obd_import *class_import_get(struct obd_import *import)
809 {
810         LASSERT(atomic_read(&import->imp_refcount) >= 0);
811         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
812         atomic_inc(&import->imp_refcount);
813         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
814                atomic_read(&import->imp_refcount));
815         return import;
816 }
817 EXPORT_SYMBOL(class_import_get);
818
819 void class_import_put(struct obd_import *import)
820 {
821         ENTRY;
822
823         LASSERT(atomic_read(&import->imp_refcount) > 0);
824         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
825         LASSERT(list_empty(&import->imp_zombie_chain));
826
827         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
828                atomic_read(&import->imp_refcount) - 1);
829
830         if (atomic_dec_and_test(&import->imp_refcount)) {
831
832                 CDEBUG(D_INFO, "final put import %p\n", import);
833
834                 spin_lock(&obd_zombie_impexp_lock);
835                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
836                 spin_unlock(&obd_zombie_impexp_lock);
837
838                 if (obd_zombie_impexp_notify != NULL)
839                         obd_zombie_impexp_notify();
840         }
841
842         EXIT;
843 }
844 EXPORT_SYMBOL(class_import_put);
845
846 void class_import_destroy(struct obd_import *import)
847 {
848         ENTRY;
849
850         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
851                 import->imp_obd->obd_name);
852
853         LASSERT(atomic_read(&import->imp_refcount) == 0);
854
855         ptlrpc_put_connection_superhack(import->imp_connection);
856
857         while (!list_empty(&import->imp_conn_list)) {
858                 struct obd_import_conn *imp_conn;
859
860                 imp_conn = list_entry(import->imp_conn_list.next,
861                                       struct obd_import_conn, oic_item);
862                 list_del(&imp_conn->oic_item);
863                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
864                 OBD_FREE(imp_conn, sizeof(*imp_conn));
865         }
866
867         LASSERT(import->imp_sec == NULL);
868         class_decref(import->imp_obd);
869         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
870         EXIT;
871 }
872
873 static void init_imp_at(struct imp_at *at) {
874         int i;
875         at_init(&at->iat_net_latency, 0, 0);
876         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
877                 /* max service estimates are tracked on the server side, so
878                    don't use the AT history here, just use the last reported
879                    val. (But keep hist for proc histogram, worst_ever) */
880                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
881                         AT_FLG_NOHIST);
882         }
883 }
884
885 struct obd_import *class_new_import(struct obd_device *obd)
886 {
887         struct obd_import *imp;
888
889         OBD_ALLOC(imp, sizeof(*imp));
890         if (imp == NULL)
891                 return NULL;
892
893         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
894         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
895         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
896         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
897         spin_lock_init(&imp->imp_lock);
898         imp->imp_last_success_conn = 0;
899         imp->imp_state = LUSTRE_IMP_NEW;
900         imp->imp_obd = class_incref(obd);
901         sema_init(&imp->imp_sec_mutex, 1);
902         cfs_waitq_init(&imp->imp_recovery_waitq);
903
904         atomic_set(&imp->imp_refcount, 2);
905         atomic_set(&imp->imp_inflight, 0);
906         atomic_set(&imp->imp_replay_inflight, 0);
907         atomic_set(&imp->imp_inval_count, 0);
908         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
909         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
910         class_handle_hash(&imp->imp_handle, import_handle_addref);
911         init_imp_at(&imp->imp_at);
912
913         /* the default magic is V2, will be used in connect RPC, and
914          * then adjusted according to the flags in request/reply. */
915         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
916
917         return imp;
918 }
919 EXPORT_SYMBOL(class_new_import);
920
921 void class_destroy_import(struct obd_import *import)
922 {
923         LASSERT(import != NULL);
924         LASSERT(import != LP_POISON);
925
926         class_handle_unhash(&import->imp_handle);
927
928         spin_lock(&import->imp_lock);
929         import->imp_generation++;
930         spin_unlock(&import->imp_lock);
931         class_import_put(import);
932 }
933 EXPORT_SYMBOL(class_destroy_import);
934
935 /* A connection defines an export context in which preallocation can
936    be managed. This releases the export pointer reference, and returns
937    the export handle, so the export refcount is 1 when this function
938    returns. */
939 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
940                   struct obd_uuid *cluuid)
941 {
942         struct obd_export *export;
943         LASSERT(conn != NULL);
944         LASSERT(obd != NULL);
945         LASSERT(cluuid != NULL);
946         ENTRY;
947
948         export = class_new_export(obd, cluuid);
949         if (IS_ERR(export))
950                 RETURN(PTR_ERR(export));
951
952         conn->cookie = export->exp_handle.h_cookie;
953         class_export_put(export);
954
955         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
956                cluuid->uuid, conn->cookie);
957         RETURN(0);
958 }
959 EXPORT_SYMBOL(class_connect);
960
961 /* if export is involved in recovery then clean up related things */
962 void class_export_recovery_cleanup(struct obd_export *exp)
963 {
964         struct obd_device *obd = exp->exp_obd;
965
966         spin_lock_bh(&obd->obd_processing_task_lock);
967         if (obd->obd_recovering && exp->exp_in_recovery) {
968                 spin_lock(&exp->exp_lock);
969                 exp->exp_in_recovery = 0;
970                 spin_unlock(&exp->exp_lock);
971                 obd->obd_connected_clients--;
972                 /* each connected client is counted as recoverable */
973                 obd->obd_recoverable_clients--;
974                 if (exp->exp_req_replay_needed) {
975                         spin_lock(&exp->exp_lock);
976                         exp->exp_req_replay_needed = 0;
977                         spin_unlock(&exp->exp_lock);
978                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
979                         atomic_dec(&obd->obd_req_replay_clients);
980                 }
981                 if (exp->exp_lock_replay_needed) {
982                         spin_lock(&exp->exp_lock);
983                         exp->exp_lock_replay_needed = 0;
984                         spin_unlock(&exp->exp_lock);
985                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
986                         atomic_dec(&obd->obd_lock_replay_clients);
987                 }
988         }
989         spin_unlock_bh(&obd->obd_processing_task_lock);
990 }
991
992 /* This function removes two references from the export: one for the
993  * hash entry and one for the export pointer passed in.  The export
994  * pointer passed to this function is destroyed should not be used
995  * again. */
996 int class_disconnect(struct obd_export *export)
997 {
998         int already_disconnected;
999         ENTRY;
1000
1001         if (export == NULL) {
1002                 fixme();
1003                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1004                 RETURN(-EINVAL);
1005         }
1006
1007         spin_lock(&export->exp_lock);
1008         already_disconnected = export->exp_disconnected;
1009         export->exp_disconnected = 1;
1010
1011         if (!hlist_unhashed(&export->exp_nid_hash))
1012                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1013                                 &export->exp_connection->c_peer.nid,
1014                                 &export->exp_nid_hash);
1015
1016         spin_unlock(&export->exp_lock);
1017
1018         /* class_cleanup(), abort_recovery(), and class_fail_export()
1019          * all end up in here, and if any of them race we shouldn't
1020          * call extra class_export_puts(). */
1021         if (already_disconnected)
1022                 RETURN(0);
1023
1024         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1025                export->exp_handle.h_cookie);
1026
1027         class_export_recovery_cleanup(export);
1028         class_unlink_export(export);
1029         class_export_put(export);
1030         RETURN(0);
1031 }
1032
1033 static void class_disconnect_export_list(struct list_head *list, int flags)
1034 {
1035         int rc;
1036         struct lustre_handle fake_conn;
1037         struct obd_export *fake_exp, *exp;
1038         ENTRY;
1039
1040         /* It's possible that an export may disconnect itself, but
1041          * nothing else will be added to this list. */
1042         while (!list_empty(list)) {
1043                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1044                 class_export_get(exp);
1045
1046                 spin_lock(&exp->exp_lock);
1047                 exp->exp_flags = flags;
1048                 spin_unlock(&exp->exp_lock);
1049
1050                 if (obd_uuid_equals(&exp->exp_client_uuid,
1051                                     &exp->exp_obd->obd_uuid)) {
1052                         CDEBUG(D_HA,
1053                                "exp %p export uuid == obd uuid, don't discon\n",
1054                                exp);
1055                         /* Need to delete this now so we don't end up pointing
1056                          * to work_list later when this export is cleaned up. */
1057                         list_del_init(&exp->exp_obd_chain);
1058                         class_export_put(exp);
1059                         continue;
1060                 }
1061
1062                 fake_conn.cookie = exp->exp_handle.h_cookie;
1063                 fake_exp = class_conn2export(&fake_conn);
1064                 if (!fake_exp) {
1065                         class_export_put(exp);
1066                         continue;
1067                 }
1068
1069                 spin_lock(&fake_exp->exp_lock);
1070                 fake_exp->exp_flags = flags;
1071                 spin_unlock(&fake_exp->exp_lock);
1072
1073                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1074                        "last request at "CFS_TIME_T"\n",
1075                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1076                        exp, exp->exp_last_request_time);
1077                 rc = obd_disconnect(fake_exp);
1078                 class_export_put(exp);
1079         }
1080         EXIT;
1081 }
1082
1083 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1084 {
1085         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1086                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1087 }
1088
1089 void class_disconnect_exports(struct obd_device *obd)
1090 {
1091         struct list_head work_list;
1092         ENTRY;
1093
1094         /* Move all of the exports from obd_exports to a work list, en masse. */
1095         spin_lock(&obd->obd_dev_lock);
1096         list_add(&work_list, &obd->obd_exports);
1097         list_del_init(&obd->obd_exports);
1098         spin_unlock(&obd->obd_dev_lock);
1099
1100         if (!list_empty(&work_list)) {
1101                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1102                        "disconnecting them\n", obd->obd_minor, obd);
1103                 class_disconnect_export_list(&work_list,
1104                                              get_exp_flags_from_obd(obd));
1105         } else
1106                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1107                        obd->obd_minor, obd);
1108         EXIT;
1109 }
1110 EXPORT_SYMBOL(class_disconnect_exports);
1111
1112 /* Remove exports that have not completed recovery.
1113  */
1114 int class_disconnect_stale_exports(struct obd_device *obd,
1115                                    int (*test_export)(struct obd_export *))
1116 {
1117         struct list_head work_list;
1118         struct list_head *pos, *n;
1119         struct obd_export *exp;
1120         int cnt = 0;
1121         ENTRY;
1122
1123         CFS_INIT_LIST_HEAD(&work_list);
1124         spin_lock(&obd->obd_dev_lock);
1125         list_for_each_safe(pos, n, &obd->obd_exports) {
1126                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1127                 if (test_export(exp))
1128                         continue;
1129
1130                 list_del(&exp->exp_obd_chain);
1131                 list_add(&exp->exp_obd_chain, &work_list);
1132                 /* don't count self-export as client */
1133                 if (obd_uuid_equals(&exp->exp_client_uuid,
1134                                      &exp->exp_obd->obd_uuid))
1135                         continue;
1136
1137                 cnt++;
1138                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1139                        obd->obd_name, exp->exp_client_uuid.uuid,
1140                        exp->exp_connection == NULL ? "<unknown>" :
1141                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1142         }
1143         spin_unlock(&obd->obd_dev_lock);
1144
1145         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1146                obd->obd_name, cnt);
1147         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1148         RETURN(cnt);
1149 }
1150 EXPORT_SYMBOL(class_disconnect_stale_exports);
1151
1152 int oig_init(struct obd_io_group **oig_out)
1153 {
1154         struct obd_io_group *oig;
1155         ENTRY;
1156
1157         OBD_ALLOC(oig, sizeof(*oig));
1158         if (oig == NULL)
1159                 RETURN(-ENOMEM);
1160
1161         spin_lock_init(&oig->oig_lock);
1162         oig->oig_rc = 0;
1163         oig->oig_pending = 0;
1164         atomic_set(&oig->oig_refcount, 1);
1165         cfs_waitq_init(&oig->oig_waitq);
1166         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1167
1168         *oig_out = oig;
1169         RETURN(0);
1170 };
1171 EXPORT_SYMBOL(oig_init);
1172
1173 static inline void oig_grab(struct obd_io_group *oig)
1174 {
1175         atomic_inc(&oig->oig_refcount);
1176 }
1177
1178 void oig_release(struct obd_io_group *oig)
1179 {
1180         if (atomic_dec_and_test(&oig->oig_refcount))
1181                 OBD_FREE(oig, sizeof(*oig));
1182 }
1183 EXPORT_SYMBOL(oig_release);
1184
1185 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1186 {
1187         int rc = 0;
1188         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1189         spin_lock(&oig->oig_lock);
1190         if (oig->oig_rc) {
1191                 rc = oig->oig_rc;
1192         } else {
1193                 oig->oig_pending++;
1194                 if (occ != NULL)
1195                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1196         }
1197         spin_unlock(&oig->oig_lock);
1198         oig_grab(oig);
1199
1200         return rc;
1201 }
1202 EXPORT_SYMBOL(oig_add_one);
1203
1204 void oig_complete_one(struct obd_io_group *oig,
1205                       struct oig_callback_context *occ, int rc)
1206 {
1207         cfs_waitq_t *wake = NULL;
1208         int old_rc;
1209
1210         spin_lock(&oig->oig_lock);
1211
1212         if (occ != NULL)
1213                 list_del_init(&occ->occ_oig_item);
1214
1215         old_rc = oig->oig_rc;
1216         if (oig->oig_rc == 0 && rc != 0)
1217                 oig->oig_rc = rc;
1218
1219         if (--oig->oig_pending <= 0)
1220                 wake = &oig->oig_waitq;
1221
1222         spin_unlock(&oig->oig_lock);
1223
1224         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1225                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1226                         oig->oig_pending);
1227         if (wake)
1228                 cfs_waitq_signal(wake);
1229         oig_release(oig);
1230 }
1231 EXPORT_SYMBOL(oig_complete_one);
1232
1233 static int oig_done(struct obd_io_group *oig)
1234 {
1235         int rc = 0;
1236         spin_lock(&oig->oig_lock);
1237         if (oig->oig_pending <= 0)
1238                 rc = 1;
1239         spin_unlock(&oig->oig_lock);
1240         return rc;
1241 }
1242
1243 static void interrupted_oig(void *data)
1244 {
1245         struct obd_io_group *oig = data;
1246         struct oig_callback_context *occ;
1247
1248         spin_lock(&oig->oig_lock);
1249         /* We need to restart the processing each time we drop the lock, as
1250          * it is possible other threads called oig_complete_one() to remove
1251          * an entry elsewhere in the list while we dropped lock.  We need to
1252          * drop the lock because osc_ap_completion() calls oig_complete_one()
1253          * which re-gets this lock ;-) as well as a lock ordering issue. */
1254 restart:
1255         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1256                 if (occ->interrupted)
1257                         continue;
1258                 occ->interrupted = 1;
1259                 spin_unlock(&oig->oig_lock);
1260                 occ->occ_interrupted(occ);
1261                 spin_lock(&oig->oig_lock);
1262                 goto restart;
1263         }
1264         spin_unlock(&oig->oig_lock);
1265 }
1266
1267 int oig_wait(struct obd_io_group *oig)
1268 {
1269         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1270         int rc;
1271
1272         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1273
1274         do {
1275                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1276                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1277                 /* we can't continue until the oig has emptied and stopped
1278                  * referencing state that the caller will free upon return */
1279                 if (rc == -EINTR)
1280                         lwi = (struct l_wait_info){ 0, };
1281         } while (rc == -EINTR);
1282
1283         LASSERTF(oig->oig_pending == 0,
1284                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1285                  oig->oig_pending);
1286
1287         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1288         return oig->oig_rc;
1289 }
1290 EXPORT_SYMBOL(oig_wait);
1291
1292 void class_fail_export(struct obd_export *exp)
1293 {
1294         int rc, already_failed;
1295
1296         spin_lock(&exp->exp_lock);
1297         already_failed = exp->exp_failed;
1298         exp->exp_failed = 1;
1299         spin_unlock(&exp->exp_lock);
1300
1301         if (already_failed) {
1302                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1303                        exp, exp->exp_client_uuid.uuid);
1304                 return;
1305         }
1306
1307         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1308                exp, exp->exp_client_uuid.uuid);
1309
1310         if (obd_dump_on_timeout)
1311                 libcfs_debug_dumplog();
1312
1313         /* Most callers into obd_disconnect are removing their own reference
1314          * (request, for example) in addition to the one from the hash table.
1315          * We don't have such a reference here, so make one. */
1316         class_export_get(exp);
1317         rc = obd_disconnect(exp);
1318         if (rc)
1319                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1320         else
1321                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1322                        exp, exp->exp_client_uuid.uuid);
1323 }
1324 EXPORT_SYMBOL(class_fail_export);
1325
1326 char *obd_export_nid2str(struct obd_export *exp)
1327 {
1328         if (exp->exp_connection != NULL)
1329                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1330
1331         return "(no nid)";
1332 }
1333 EXPORT_SYMBOL(obd_export_nid2str);
1334
1335 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1336 {
1337         struct obd_export *doomed_exp = NULL;
1338         int exports_evicted = 0;
1339
1340         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1341
1342         do {
1343                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1344                 if (doomed_exp == NULL)
1345                         break;
1346
1347                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1348                          "nid %s found, wanted nid %s, requested nid %s\n",
1349                          obd_export_nid2str(doomed_exp),
1350                          libcfs_nid2str(nid_key), nid);
1351                 LASSERTF(doomed_exp != obd->obd_self_export,
1352                          "self-export is hashed by NID?\n");
1353                 exports_evicted++;
1354                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1355                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1356                        exports_evicted);
1357                 class_fail_export(doomed_exp);
1358                 class_export_put(doomed_exp);
1359         } while (1);
1360
1361         if (!exports_evicted)
1362                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1363                        obd->obd_name, nid);
1364         return exports_evicted;
1365 }
1366 EXPORT_SYMBOL(obd_export_evict_by_nid);
1367
1368 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1369 {
1370         struct obd_export *doomed_exp = NULL;
1371         struct obd_uuid doomed_uuid;
1372         int exports_evicted = 0;
1373
1374         obd_str2uuid(&doomed_uuid, uuid);
1375         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1376                 CERROR("%s: can't evict myself\n", obd->obd_name);
1377                 return exports_evicted;
1378         }
1379
1380         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1381
1382         if (doomed_exp == NULL) {
1383                 CERROR("%s: can't disconnect %s: no exports found\n",
1384                        obd->obd_name, uuid);
1385         } else {
1386                 CWARN("%s: evicting %s at adminstrative request\n",
1387                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1388                 class_fail_export(doomed_exp);
1389                 class_export_put(doomed_exp);
1390                 exports_evicted++;
1391         }
1392
1393         return exports_evicted;
1394 }
1395 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1396
1397 /**
1398  * kill zombie imports and exports
1399  */
1400 void obd_zombie_impexp_cull(void)
1401 {
1402         struct obd_import *import;
1403         struct obd_export *export;
1404         ENTRY;
1405
1406         do {
1407                 spin_lock (&obd_zombie_impexp_lock);
1408
1409                 import = NULL;
1410                 if (!list_empty(&obd_zombie_imports)) {
1411                         import = list_entry(obd_zombie_imports.next,
1412                                             struct obd_import,
1413                                             imp_zombie_chain);
1414                         list_del(&import->imp_zombie_chain);
1415                 }
1416
1417                 export = NULL;
1418                 if (!list_empty(&obd_zombie_exports)) {
1419                         export = list_entry(obd_zombie_exports.next,
1420                                             struct obd_export,
1421                                             exp_obd_chain);
1422                         list_del_init(&export->exp_obd_chain);
1423                 }
1424
1425                 spin_unlock(&obd_zombie_impexp_lock);
1426
1427                 if (import != NULL)
1428                         class_import_destroy(import);
1429
1430                 if (export != NULL)
1431                         class_export_destroy(export);
1432
1433         } while (import != NULL || export != NULL);
1434         EXIT;
1435 }
1436
1437 static struct completion        obd_zombie_start;
1438 static struct completion        obd_zombie_stop;
1439 static unsigned long            obd_zombie_flags;
1440 static cfs_waitq_t              obd_zombie_waitq;
1441
1442 enum {
1443         OBD_ZOMBIE_STOP = 1
1444 };
1445
1446 /**
1447  * check for work for kill zombie import/export thread.
1448  */
1449 int obd_zombie_impexp_check(void *arg)
1450 {
1451         int rc;
1452
1453         spin_lock(&obd_zombie_impexp_lock);
1454         rc = list_empty(&obd_zombie_imports) &&
1455              list_empty(&obd_zombie_exports) &&
1456              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1457
1458         spin_unlock(&obd_zombie_impexp_lock);
1459
1460         RETURN(rc);
1461 }
1462
1463 /**
1464  * notify import/export destroy thread about new zombie.
1465  */
1466 static void obd_zombie_impexp_notify(void)
1467 {
1468         cfs_waitq_signal(&obd_zombie_waitq);
1469 }
1470
1471 #ifdef __KERNEL__
1472
1473 /**
1474  * destroy zombie export/import thread.
1475  */
1476 static int obd_zombie_impexp_thread(void *unused)
1477 {
1478         int rc;
1479
1480         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1481                 complete(&obd_zombie_start);
1482                 RETURN(rc);
1483         }
1484
1485         complete(&obd_zombie_start);
1486
1487         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1488                 struct l_wait_info lwi = { 0 };
1489
1490                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1491
1492                 obd_zombie_impexp_cull();
1493         }
1494
1495         complete(&obd_zombie_stop);
1496
1497         RETURN(0);
1498 }
1499
1500 #else /* ! KERNEL */
1501
1502 static atomic_t zombie_recur = ATOMIC_INIT(0);
1503 static void *obd_zombie_impexp_work_cb;
1504 static void *obd_zombie_impexp_idle_cb;
1505
1506 int obd_zombie_impexp_kill(void *arg)
1507 {
1508         int rc = 0;
1509
1510         if (atomic_inc_return(&zombie_recur) == 1) {
1511                 obd_zombie_impexp_cull();
1512                 rc = 1;
1513         }
1514         atomic_dec(&zombie_recur);
1515         return rc;
1516 }
1517
1518 #endif
1519
1520 /**
1521  * start destroy zombie import/export thread
1522  */
1523 int obd_zombie_impexp_init(void)
1524 {
1525         int rc;
1526
1527         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1528         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1529         spin_lock_init(&obd_zombie_impexp_lock);
1530         init_completion(&obd_zombie_start);
1531         init_completion(&obd_zombie_stop);
1532         cfs_waitq_init(&obd_zombie_waitq);
1533
1534 #ifdef __KERNEL__
1535         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1536         if (rc < 0)
1537                 RETURN(rc);
1538
1539         wait_for_completion(&obd_zombie_start);
1540 #else
1541
1542         obd_zombie_impexp_work_cb =
1543                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1544                                                  &obd_zombie_impexp_kill, NULL);
1545
1546         obd_zombie_impexp_idle_cb =
1547                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1548                                                  &obd_zombie_impexp_check, NULL);
1549         rc = 0;
1550
1551 #endif
1552         RETURN(rc);
1553 }
1554 /**
1555  * stop destroy zombie import/export thread
1556  */
1557 void obd_zombie_impexp_stop(void)
1558 {
1559         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1560         obd_zombie_impexp_notify();
1561 #ifdef __KERNEL__
1562         wait_for_completion(&obd_zombie_stop);
1563 #else
1564         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1565         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1566 #endif
1567 }