Whamcloud - gitweb
b=17310
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150                         struct lprocfs_vars *vars, const char *name,
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173
174         if (type->typ_dt_ops == NULL ||
175             type->typ_md_ops == NULL ||
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = lu_device_type_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 lu_device_type_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 /**
258  * Create a new obd device.
259  *
260  * Find an empty slot in ::obd_devs[], create a new obd device in it.
261  *
262  * \param typename [in] obd device type string.
263  * \param name     [in] obd device name.
264  *
265  * \retval NULL if create fails, otherwise return the obd device
266  *         pointer created.
267  */
268 struct obd_device *class_newdev(const char *type_name, const char *name)
269 {
270         struct obd_device *result = NULL;
271         struct obd_device *newdev;
272         struct obd_type *type = NULL;
273         int i;
274         int new_obd_minor = 0;
275
276         if (strlen(name) >= MAX_OBD_NAME) {
277                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278                 RETURN(ERR_PTR(-EINVAL));
279         }
280
281         type = class_get_type(type_name);
282         if (type == NULL){
283                 CERROR("OBD: unknown type: %s\n", type_name);
284                 RETURN(ERR_PTR(-ENODEV));
285         }
286
287         newdev = obd_device_alloc();
288         if (newdev == NULL) {
289                 class_put_type(type);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
293
294         spin_lock(&obd_dev_lock);
295         for (i = 0; i < class_devno_max(); i++) {
296                 struct obd_device *obd = class_num2obd(i);
297                 if (obd && obd->obd_name &&
298                     (strcmp(name, obd->obd_name) == 0)) {
299                         CERROR("Device %s already exists, won't add\n", name);
300                         if (result) {
301                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302                                          "%p obd_magic %08x != %08x\n", result,
303                                          result->obd_magic, OBD_DEVICE_MAGIC);
304                                 LASSERTF(result->obd_minor == new_obd_minor,
305                                          "%p obd_minor %d != %d\n", result,
306                                          result->obd_minor, new_obd_minor);
307
308                                 obd_devs[result->obd_minor] = NULL;
309                                 result->obd_name[0]='\0';
310                          }
311                         result = ERR_PTR(-EEXIST);
312                         break;
313                 }
314                 if (!result && !obd) {
315                         result = newdev;
316                         result->obd_minor = i;
317                         new_obd_minor = i;
318                         result->obd_type = type;
319                         strncpy(result->obd_name, name,
320                                 sizeof(result->obd_name) - 1);
321                         obd_devs[i] = result;
322                 }
323         }
324         spin_unlock(&obd_dev_lock);
325
326         if (result == NULL && i >= class_devno_max()) {
327                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328                        class_devno_max());
329                 result = ERR_PTR(-EOVERFLOW);
330         }
331
332         if (IS_ERR(result)) {
333                 obd_device_free(newdev);
334                 class_put_type(type);
335         } else {
336                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337                        result->obd_name, result);
338         }
339         return result;
340 }
341
342 void class_release_dev(struct obd_device *obd)
343 {
344         struct obd_type *obd_type = obd->obd_type;
345
346         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350         LASSERT(obd_type != NULL);
351
352         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353                obd->obd_name,obd->obd_type->typ_name);
354
355         spin_lock(&obd_dev_lock);
356         obd_devs[obd->obd_minor] = NULL;
357         spin_unlock(&obd_dev_lock);
358         obd_device_free(obd);
359
360         class_put_type(obd_type);
361 }
362
363 int class_name2dev(const char *name)
364 {
365         int i;
366
367         if (!name)
368                 return -1;
369
370         spin_lock(&obd_dev_lock);
371         for (i = 0; i < class_devno_max(); i++) {
372                 struct obd_device *obd = class_num2obd(i);
373                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374                         /* Make sure we finished attaching before we give
375                            out any references */
376                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377                         if (obd->obd_attached) {
378                                 spin_unlock(&obd_dev_lock);
379                                 return i;
380                         }
381                         break;
382                 }
383         }
384         spin_unlock(&obd_dev_lock);
385
386         return -1;
387 }
388
389 struct obd_device *class_name2obd(const char *name)
390 {
391         int dev = class_name2dev(name);
392
393         if (dev < 0 || dev > class_devno_max())
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 int class_uuid2dev(struct obd_uuid *uuid)
399 {
400         int i;
401
402         spin_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407                         spin_unlock(&obd_dev_lock);
408                         return i;
409                 }
410         }
411         spin_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
417 {
418         int dev = class_uuid2dev(uuid);
419         if (dev < 0)
420                 return NULL;
421         return class_num2obd(dev);
422 }
423
424 /**
425  * Get obd device from ::obd_devs[]
426  *
427  * \param num [in] array index
428  *
429  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430  *         otherwise return the obd device there.
431  */
432 struct obd_device *class_num2obd(int num)
433 {
434         struct obd_device *obd = NULL;
435
436         if (num < class_devno_max()) {
437                 obd = obd_devs[num];
438                 if (obd == NULL)
439                         return NULL;
440
441                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442                          "%p obd_magic %08x != %08x\n",
443                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444                 LASSERTF(obd->obd_minor == num,
445                          "%p obd_minor %0d != %0d\n",
446                          obd, obd->obd_minor, num);
447         }
448
449         return obd;
450 }
451
452 void class_obd_list(void)
453 {
454         char *status;
455         int i;
456
457         spin_lock(&obd_dev_lock);
458         for (i = 0; i < class_devno_max(); i++) {
459                 struct obd_device *obd = class_num2obd(i);
460                 if (obd == NULL)
461                         continue;
462                 if (obd->obd_stopping)
463                         status = "ST";
464                 else if (obd->obd_set_up)
465                         status = "UP";
466                 else if (obd->obd_attached)
467                         status = "AT";
468                 else
469                         status = "--";
470                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471                          i, status, obd->obd_type->typ_name,
472                          obd->obd_name, obd->obd_uuid.uuid,
473                          atomic_read(&obd->obd_refcount));
474         }
475         spin_unlock(&obd_dev_lock);
476         return;
477 }
478
479 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
480    specified, then only the client with that uuid is returned,
481    otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483                                           const char * typ_name,
484                                           struct obd_uuid *grp_uuid)
485 {
486         int i;
487
488         spin_lock(&obd_dev_lock);
489         for (i = 0; i < class_devno_max(); i++) {
490                 struct obd_device *obd = class_num2obd(i);
491                 if (obd == NULL)
492                         continue;
493                 if ((strncmp(obd->obd_type->typ_name, typ_name,
494                              strlen(typ_name)) == 0)) {
495                         if (obd_uuid_equals(tgt_uuid,
496                                             &obd->u.cli.cl_target_uuid) &&
497                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
498                                                          &obd->obd_uuid) : 1)) {
499                                 spin_unlock(&obd_dev_lock);
500                                 return obd;
501                         }
502                 }
503         }
504         spin_unlock(&obd_dev_lock);
505
506         return NULL;
507 }
508
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510                                             struct obd_uuid *grp_uuid)
511 {
512         struct obd_device *obd;
513
514         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
515         if (!obd)
516                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
517                                             grp_uuid);
518         return obd;
519 }
520
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522    searching at *next, and if a device is found, the next index to look
523    at is saved in *next. If next is NULL, then the first matching device
524    will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
526 {
527         int i;
528
529         if (next == NULL)
530                 i = 0;
531         else if (*next >= 0 && *next < class_devno_max())
532                 i = *next;
533         else
534                 return NULL;
535
536         spin_lock(&obd_dev_lock);
537         for (; i < class_devno_max(); i++) {
538                 struct obd_device *obd = class_num2obd(i);
539                 if (obd == NULL)
540                         continue;
541                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
542                         if (next != NULL)
543                                 *next = i+1;
544                         spin_unlock(&obd_dev_lock);
545                         return obd;
546                 }
547         }
548         spin_unlock(&obd_dev_lock);
549
550         return NULL;
551 }
552
553
554 void obd_cleanup_caches(void)
555 {
556         int rc;
557
558         ENTRY;
559         if (obd_device_cachep) {
560                 rc = cfs_mem_cache_destroy(obd_device_cachep);
561                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562                 obd_device_cachep = NULL;
563         }
564         if (obdo_cachep) {
565                 rc = cfs_mem_cache_destroy(obdo_cachep);
566                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
567                 obdo_cachep = NULL;
568         }
569         if (import_cachep) {
570                 rc = cfs_mem_cache_destroy(import_cachep);
571                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572                 import_cachep = NULL;
573         }
574         if (capa_cachep) {
575                 rc = cfs_mem_cache_destroy(capa_cachep);
576                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
577                 capa_cachep = NULL;
578         }
579         EXIT;
580 }
581
582 int obd_init_caches(void)
583 {
584         ENTRY;
585
586         LASSERT(obd_device_cachep == NULL);
587         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588                                                  sizeof(struct obd_device),
589                                                  0, 0);
590         if (!obd_device_cachep)
591                 GOTO(out, -ENOMEM);
592
593         LASSERT(obdo_cachep == NULL);
594         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
595                                            0, 0);
596         if (!obdo_cachep)
597                 GOTO(out, -ENOMEM);
598
599         LASSERT(import_cachep == NULL);
600         import_cachep = cfs_mem_cache_create("ll_import_cache",
601                                              sizeof(struct obd_import),
602                                              0, 0);
603         if (!import_cachep)
604                 GOTO(out, -ENOMEM);
605
606         LASSERT(capa_cachep == NULL);
607         capa_cachep = cfs_mem_cache_create("capa_cache",
608                                            sizeof(struct obd_capa), 0, 0);
609         if (!capa_cachep)
610                 GOTO(out, -ENOMEM);
611
612         RETURN(0);
613  out:
614         obd_cleanup_caches();
615         RETURN(-ENOMEM);
616
617 }
618
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
621 {
622         struct obd_export *export;
623         ENTRY;
624
625         if (!conn) {
626                 CDEBUG(D_CACHE, "looking for null handle\n");
627                 RETURN(NULL);
628         }
629
630         if (conn->cookie == -1) {  /* this means assign a new connection */
631                 CDEBUG(D_CACHE, "want a new connection\n");
632                 RETURN(NULL);
633         }
634
635         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636         export = class_handle2object(conn->cookie);
637         RETURN(export);
638 }
639
640 struct obd_device *class_exp2obd(struct obd_export *exp)
641 {
642         if (exp)
643                 return exp->exp_obd;
644         return NULL;
645 }
646
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
648 {
649         struct obd_export *export;
650         export = class_conn2export(conn);
651         if (export) {
652                 struct obd_device *obd = export->exp_obd;
653                 class_export_put(export);
654                 return obd;
655         }
656         return NULL;
657 }
658
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
660 {
661         struct obd_device *obd = exp->exp_obd;
662         if (obd == NULL)
663                 return NULL;
664         return obd->u.cli.cl_import;
665 }
666
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
668 {
669         struct obd_device *obd = class_conn2obd(conn);
670         if (obd == NULL)
671                 return NULL;
672         return obd->u.cli.cl_import;
673 }
674
675 /* Export management functions */
676 static void export_handle_addref(void *export)
677 {
678         class_export_get(export);
679 }
680
681 struct obd_export *class_export_get(struct obd_export *exp)
682 {
683         atomic_inc(&exp->exp_refcount);
684         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
685                atomic_read(&exp->exp_refcount));
686         return exp;
687 }
688 EXPORT_SYMBOL(class_export_get);
689
690 void class_export_put(struct obd_export *exp)
691 {
692         LASSERT(exp != NULL);
693         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
694                atomic_read(&exp->exp_refcount) - 1);
695         LASSERT(atomic_read(&exp->exp_refcount) > 0);
696         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
697
698         if (atomic_dec_and_test(&exp->exp_refcount)) {
699                 LASSERT (list_empty(&exp->exp_obd_chain));
700
701                 CDEBUG(D_IOCTL, "final put %p/%s\n",
702                        exp, exp->exp_client_uuid.uuid);
703
704                 spin_lock(&obd_zombie_impexp_lock);
705                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
706                 spin_unlock(&obd_zombie_impexp_lock);
707
708                 if (obd_zombie_impexp_notify != NULL)
709                         obd_zombie_impexp_notify();
710         }
711 }
712 EXPORT_SYMBOL(class_export_put);
713
714 static void class_export_destroy(struct obd_export *exp)
715 {
716         struct obd_device *obd = exp->exp_obd;
717         ENTRY;
718
719         LASSERT (atomic_read(&exp->exp_refcount) == 0);
720
721         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
722                exp->exp_client_uuid.uuid, obd->obd_name);
723
724         LASSERT(obd != NULL);
725
726         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
727         if (exp->exp_connection)
728                 ptlrpc_put_connection_superhack(exp->exp_connection);
729
730         LASSERT(list_empty(&exp->exp_outstanding_replies));
731         LASSERT(list_empty(&exp->exp_req_replay_queue));
732         obd_destroy_export(exp);
733         class_decref(obd, "export", exp);
734
735         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
736         EXIT;
737 }
738
739 /* Creates a new export, adds it to the hash table, and returns a
740  * pointer to it. The refcount is 2: one for the hash reference, and
741  * one for the pointer returned by this function. */
742 struct obd_export *class_new_export(struct obd_device *obd,
743                                     struct obd_uuid *cluuid)
744 {
745         struct obd_export *export;
746         int rc = 0;
747
748         OBD_ALLOC_PTR(export);
749         if (!export)
750                 return ERR_PTR(-ENOMEM);
751
752         export->exp_conn_cnt = 0;
753         export->exp_lock_hash = NULL;
754         atomic_set(&export->exp_refcount, 2);
755         atomic_set(&export->exp_rpc_count, 0);
756         export->exp_obd = obd;
757         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
758         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
759         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
760         class_handle_hash(&export->exp_handle, export_handle_addref);
761         export->exp_last_request_time = cfs_time_current_sec();
762         spin_lock_init(&export->exp_lock);
763         INIT_HLIST_NODE(&export->exp_uuid_hash);
764         INIT_HLIST_NODE(&export->exp_nid_hash);
765
766         export->exp_sp_peer = LUSTRE_SP_ANY;
767         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
768         export->exp_client_uuid = *cluuid;
769         obd_init_export(export);
770
771         spin_lock(&obd->obd_dev_lock);
772         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
773                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
774                                             &export->exp_uuid_hash);
775                 if (rc != 0) {
776                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
777                                       obd->obd_name, cluuid->uuid, rc);
778                         spin_unlock(&obd->obd_dev_lock);
779                         class_handle_unhash(&export->exp_handle);
780                         OBD_FREE_PTR(export);
781                         return ERR_PTR(-EALREADY);
782                 }
783         }
784
785         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
786         class_incref(obd, "export", export);
787         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
788         list_add_tail(&export->exp_obd_chain_timed,
789                       &export->exp_obd->obd_exports_timed);
790         export->exp_obd->obd_num_exports++;
791         spin_unlock(&obd->obd_dev_lock);
792
793         return export;
794 }
795 EXPORT_SYMBOL(class_new_export);
796
797 void class_unlink_export(struct obd_export *exp)
798 {
799         class_handle_unhash(&exp->exp_handle);
800
801         spin_lock(&exp->exp_obd->obd_dev_lock);
802         /* delete an uuid-export hashitem from hashtables */
803         if (!hlist_unhashed(&exp->exp_uuid_hash))
804                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
805                                 &exp->exp_client_uuid,
806                                 &exp->exp_uuid_hash);
807
808         list_del_init(&exp->exp_obd_chain);
809         list_del_init(&exp->exp_obd_chain_timed);
810         exp->exp_obd->obd_num_exports--;
811         spin_unlock(&exp->exp_obd->obd_dev_lock);
812
813         class_export_put(exp);
814 }
815 EXPORT_SYMBOL(class_unlink_export);
816
817 /* Import management functions */
818 static void import_handle_addref(void *import)
819 {
820         class_import_get(import);
821 }
822
823 struct obd_import *class_import_get(struct obd_import *import)
824 {
825         LASSERT(atomic_read(&import->imp_refcount) >= 0);
826         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
827         atomic_inc(&import->imp_refcount);
828         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
829                atomic_read(&import->imp_refcount));
830         return import;
831 }
832 EXPORT_SYMBOL(class_import_get);
833
834 void class_import_put(struct obd_import *import)
835 {
836         ENTRY;
837
838         LASSERT(atomic_read(&import->imp_refcount) > 0);
839         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
840         LASSERT(list_empty(&import->imp_zombie_chain));
841
842         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
843                atomic_read(&import->imp_refcount) - 1);
844
845         if (atomic_dec_and_test(&import->imp_refcount)) {
846
847                 CDEBUG(D_INFO, "final put import %p\n", import);
848
849                 spin_lock(&obd_zombie_impexp_lock);
850                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
851                 spin_unlock(&obd_zombie_impexp_lock);
852
853                 if (obd_zombie_impexp_notify != NULL)
854                         obd_zombie_impexp_notify();
855         }
856
857         EXIT;
858 }
859 EXPORT_SYMBOL(class_import_put);
860
861 void class_import_destroy(struct obd_import *import)
862 {
863         ENTRY;
864
865         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
866                 import->imp_obd->obd_name);
867
868         LASSERT(atomic_read(&import->imp_refcount) == 0);
869
870         ptlrpc_put_connection_superhack(import->imp_connection);
871
872         while (!list_empty(&import->imp_conn_list)) {
873                 struct obd_import_conn *imp_conn;
874
875                 imp_conn = list_entry(import->imp_conn_list.next,
876                                       struct obd_import_conn, oic_item);
877                 list_del(&imp_conn->oic_item);
878                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
879                 OBD_FREE(imp_conn, sizeof(*imp_conn));
880         }
881
882         LASSERT(import->imp_sec == NULL);
883         class_decref(import->imp_obd, "import", import);
884         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
885         EXIT;
886 }
887
888 static void init_imp_at(struct imp_at *at) {
889         int i;
890         at_init(&at->iat_net_latency, 0, 0);
891         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
892                 /* max service estimates are tracked on the server side, so
893                    don't use the AT history here, just use the last reported
894                    val. (But keep hist for proc histogram, worst_ever) */
895                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
896                         AT_FLG_NOHIST);
897         }
898 }
899
900 struct obd_import *class_new_import(struct obd_device *obd)
901 {
902         struct obd_import *imp;
903
904         OBD_ALLOC(imp, sizeof(*imp));
905         if (imp == NULL)
906                 return NULL;
907
908         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
909         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
910         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
911         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
912         spin_lock_init(&imp->imp_lock);
913         imp->imp_last_success_conn = 0;
914         imp->imp_state = LUSTRE_IMP_NEW;
915         imp->imp_obd = class_incref(obd, "import", imp);
916         sema_init(&imp->imp_sec_mutex, 1);
917         cfs_waitq_init(&imp->imp_recovery_waitq);
918
919         atomic_set(&imp->imp_refcount, 2);
920         atomic_set(&imp->imp_unregistering, 0);
921         atomic_set(&imp->imp_inflight, 0);
922         atomic_set(&imp->imp_replay_inflight, 0);
923         atomic_set(&imp->imp_inval_count, 0);
924         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
925         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
926         class_handle_hash(&imp->imp_handle, import_handle_addref);
927         init_imp_at(&imp->imp_at);
928
929         /* the default magic is V2, will be used in connect RPC, and
930          * then adjusted according to the flags in request/reply. */
931         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
932
933         return imp;
934 }
935 EXPORT_SYMBOL(class_new_import);
936
937 void class_destroy_import(struct obd_import *import)
938 {
939         LASSERT(import != NULL);
940         LASSERT(import != LP_POISON);
941
942         class_handle_unhash(&import->imp_handle);
943
944         spin_lock(&import->imp_lock);
945         import->imp_generation++;
946         spin_unlock(&import->imp_lock);
947         class_import_put(import);
948 }
949 EXPORT_SYMBOL(class_destroy_import);
950
951 /* A connection defines an export context in which preallocation can
952    be managed. This releases the export pointer reference, and returns
953    the export handle, so the export refcount is 1 when this function
954    returns. */
955 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
956                   struct obd_uuid *cluuid)
957 {
958         struct obd_export *export;
959         LASSERT(conn != NULL);
960         LASSERT(obd != NULL);
961         LASSERT(cluuid != NULL);
962         ENTRY;
963
964         export = class_new_export(obd, cluuid);
965         if (IS_ERR(export))
966                 RETURN(PTR_ERR(export));
967
968         conn->cookie = export->exp_handle.h_cookie;
969         class_export_put(export);
970
971         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
972                cluuid->uuid, conn->cookie);
973         RETURN(0);
974 }
975 EXPORT_SYMBOL(class_connect);
976
977 /* if export is involved in recovery then clean up related things */
978 void class_export_recovery_cleanup(struct obd_export *exp)
979 {
980         struct obd_device *obd = exp->exp_obd;
981
982         spin_lock_bh(&obd->obd_processing_task_lock);
983         if (obd->obd_recovering && exp->exp_in_recovery) {
984                 spin_lock(&exp->exp_lock);
985                 exp->exp_in_recovery = 0;
986                 spin_unlock(&exp->exp_lock);
987                 obd->obd_connected_clients--;
988                 /* each connected client is counted as recoverable */
989                 obd->obd_recoverable_clients--;
990                 if (exp->exp_req_replay_needed) {
991                         spin_lock(&exp->exp_lock);
992                         exp->exp_req_replay_needed = 0;
993                         spin_unlock(&exp->exp_lock);
994                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
995                         atomic_dec(&obd->obd_req_replay_clients);
996                 }
997                 if (exp->exp_lock_replay_needed) {
998                         spin_lock(&exp->exp_lock);
999                         exp->exp_lock_replay_needed = 0;
1000                         spin_unlock(&exp->exp_lock);
1001                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1002                         atomic_dec(&obd->obd_lock_replay_clients);
1003                 }
1004         }
1005         spin_unlock_bh(&obd->obd_processing_task_lock);
1006 }
1007
1008 /* This function removes two references from the export: one for the
1009  * hash entry and one for the export pointer passed in.  The export
1010  * pointer passed to this function is destroyed should not be used
1011  * again. */
1012 int class_disconnect(struct obd_export *export)
1013 {
1014         int already_disconnected;
1015         ENTRY;
1016
1017         if (export == NULL) {
1018                 fixme();
1019                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1020                 RETURN(-EINVAL);
1021         }
1022
1023         spin_lock(&export->exp_lock);
1024         already_disconnected = export->exp_disconnected;
1025         export->exp_disconnected = 1;
1026
1027         if (!hlist_unhashed(&export->exp_nid_hash))
1028                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1029                                 &export->exp_connection->c_peer.nid,
1030                                 &export->exp_nid_hash);
1031
1032         spin_unlock(&export->exp_lock);
1033
1034         /* class_cleanup(), abort_recovery(), and class_fail_export()
1035          * all end up in here, and if any of them race we shouldn't
1036          * call extra class_export_puts(). */
1037         if (already_disconnected)
1038                 RETURN(0);
1039
1040         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1041                export->exp_handle.h_cookie);
1042
1043         class_export_recovery_cleanup(export);
1044         class_unlink_export(export);
1045         class_export_put(export);
1046         RETURN(0);
1047 }
1048
1049 static void class_disconnect_export_list(struct list_head *list, int flags)
1050 {
1051         int rc;
1052         struct lustre_handle fake_conn;
1053         struct obd_export *fake_exp, *exp;
1054         ENTRY;
1055
1056         /* It's possible that an export may disconnect itself, but
1057          * nothing else will be added to this list. */
1058         while (!list_empty(list)) {
1059                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1060                 class_export_get(exp);
1061
1062                 spin_lock(&exp->exp_lock);
1063                 exp->exp_flags = flags;
1064                 spin_unlock(&exp->exp_lock);
1065
1066                 if (obd_uuid_equals(&exp->exp_client_uuid,
1067                                     &exp->exp_obd->obd_uuid)) {
1068                         CDEBUG(D_HA,
1069                                "exp %p export uuid == obd uuid, don't discon\n",
1070                                exp);
1071                         /* Need to delete this now so we don't end up pointing
1072                          * to work_list later when this export is cleaned up. */
1073                         list_del_init(&exp->exp_obd_chain);
1074                         class_export_put(exp);
1075                         continue;
1076                 }
1077
1078                 fake_conn.cookie = exp->exp_handle.h_cookie;
1079                 fake_exp = class_conn2export(&fake_conn);
1080                 if (!fake_exp) {
1081                         class_export_put(exp);
1082                         continue;
1083                 }
1084
1085                 spin_lock(&fake_exp->exp_lock);
1086                 fake_exp->exp_flags = flags;
1087                 spin_unlock(&fake_exp->exp_lock);
1088
1089                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1090                        "last request at "CFS_TIME_T"\n",
1091                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1092                        exp, exp->exp_last_request_time);
1093                 rc = obd_disconnect(fake_exp);
1094                 class_export_put(exp);
1095         }
1096         EXIT;
1097 }
1098
1099 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1100 {
1101         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1102                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1103 }
1104
1105 void class_disconnect_exports(struct obd_device *obd)
1106 {
1107         struct list_head work_list;
1108         ENTRY;
1109
1110         /* Move all of the exports from obd_exports to a work list, en masse. */
1111         spin_lock(&obd->obd_dev_lock);
1112         list_add(&work_list, &obd->obd_exports);
1113         list_del_init(&obd->obd_exports);
1114         spin_unlock(&obd->obd_dev_lock);
1115
1116         if (!list_empty(&work_list)) {
1117                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1118                        "disconnecting them\n", obd->obd_minor, obd);
1119                 class_disconnect_export_list(&work_list,
1120                                              get_exp_flags_from_obd(obd));
1121         } else
1122                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1123                        obd->obd_minor, obd);
1124         EXIT;
1125 }
1126 EXPORT_SYMBOL(class_disconnect_exports);
1127
1128 /* Remove exports that have not completed recovery.
1129  */
1130 int class_disconnect_stale_exports(struct obd_device *obd,
1131                                    int (*test_export)(struct obd_export *))
1132 {
1133         struct list_head work_list;
1134         struct list_head *pos, *n;
1135         struct obd_export *exp;
1136         int cnt = 0;
1137         ENTRY;
1138
1139         CFS_INIT_LIST_HEAD(&work_list);
1140         spin_lock(&obd->obd_dev_lock);
1141         list_for_each_safe(pos, n, &obd->obd_exports) {
1142                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1143                 if (test_export(exp))
1144                         continue;
1145
1146                 list_del(&exp->exp_obd_chain);
1147                 list_add(&exp->exp_obd_chain, &work_list);
1148                 /* don't count self-export as client */
1149                 if (obd_uuid_equals(&exp->exp_client_uuid,
1150                                      &exp->exp_obd->obd_uuid))
1151                         continue;
1152
1153                 cnt++;
1154                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1155                        obd->obd_name, exp->exp_client_uuid.uuid,
1156                        exp->exp_connection == NULL ? "<unknown>" :
1157                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1158         }
1159         spin_unlock(&obd->obd_dev_lock);
1160
1161         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1162                obd->obd_name, cnt);
1163         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1164         RETURN(cnt);
1165 }
1166 EXPORT_SYMBOL(class_disconnect_stale_exports);
1167
1168 int oig_init(struct obd_io_group **oig_out)
1169 {
1170         struct obd_io_group *oig;
1171         ENTRY;
1172
1173         OBD_ALLOC(oig, sizeof(*oig));
1174         if (oig == NULL)
1175                 RETURN(-ENOMEM);
1176
1177         spin_lock_init(&oig->oig_lock);
1178         oig->oig_rc = 0;
1179         oig->oig_pending = 0;
1180         atomic_set(&oig->oig_refcount, 1);
1181         cfs_waitq_init(&oig->oig_waitq);
1182         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1183
1184         *oig_out = oig;
1185         RETURN(0);
1186 };
1187 EXPORT_SYMBOL(oig_init);
1188
1189 static inline void oig_grab(struct obd_io_group *oig)
1190 {
1191         atomic_inc(&oig->oig_refcount);
1192 }
1193
1194 void oig_release(struct obd_io_group *oig)
1195 {
1196         if (atomic_dec_and_test(&oig->oig_refcount))
1197                 OBD_FREE(oig, sizeof(*oig));
1198 }
1199 EXPORT_SYMBOL(oig_release);
1200
1201 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1202 {
1203         int rc = 0;
1204         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1205         spin_lock(&oig->oig_lock);
1206         if (oig->oig_rc) {
1207                 rc = oig->oig_rc;
1208         } else {
1209                 oig->oig_pending++;
1210                 if (occ != NULL)
1211                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1212         }
1213         spin_unlock(&oig->oig_lock);
1214         oig_grab(oig);
1215
1216         return rc;
1217 }
1218 EXPORT_SYMBOL(oig_add_one);
1219
1220 void oig_complete_one(struct obd_io_group *oig,
1221                       struct oig_callback_context *occ, int rc)
1222 {
1223         cfs_waitq_t *wake = NULL;
1224         int old_rc;
1225
1226         spin_lock(&oig->oig_lock);
1227
1228         if (occ != NULL)
1229                 list_del_init(&occ->occ_oig_item);
1230
1231         old_rc = oig->oig_rc;
1232         if (oig->oig_rc == 0 && rc != 0)
1233                 oig->oig_rc = rc;
1234
1235         if (--oig->oig_pending <= 0)
1236                 wake = &oig->oig_waitq;
1237
1238         spin_unlock(&oig->oig_lock);
1239
1240         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1241                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1242                         oig->oig_pending);
1243         if (wake)
1244                 cfs_waitq_signal(wake);
1245         oig_release(oig);
1246 }
1247 EXPORT_SYMBOL(oig_complete_one);
1248
1249 static int oig_done(struct obd_io_group *oig)
1250 {
1251         int rc = 0;
1252         spin_lock(&oig->oig_lock);
1253         if (oig->oig_pending <= 0)
1254                 rc = 1;
1255         spin_unlock(&oig->oig_lock);
1256         return rc;
1257 }
1258
1259 static void interrupted_oig(void *data)
1260 {
1261         struct obd_io_group *oig = data;
1262         struct oig_callback_context *occ;
1263
1264         spin_lock(&oig->oig_lock);
1265         /* We need to restart the processing each time we drop the lock, as
1266          * it is possible other threads called oig_complete_one() to remove
1267          * an entry elsewhere in the list while we dropped lock.  We need to
1268          * drop the lock because osc_ap_completion() calls oig_complete_one()
1269          * which re-gets this lock ;-) as well as a lock ordering issue. */
1270 restart:
1271         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1272                 if (occ->interrupted)
1273                         continue;
1274                 occ->interrupted = 1;
1275                 spin_unlock(&oig->oig_lock);
1276                 occ->occ_interrupted(occ);
1277                 spin_lock(&oig->oig_lock);
1278                 goto restart;
1279         }
1280         spin_unlock(&oig->oig_lock);
1281 }
1282
1283 int oig_wait(struct obd_io_group *oig)
1284 {
1285         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1286         int rc;
1287
1288         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1289
1290         do {
1291                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1292                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1293                 /* we can't continue until the oig has emptied and stopped
1294                  * referencing state that the caller will free upon return */
1295                 if (rc == -EINTR)
1296                         lwi = (struct l_wait_info){ 0, };
1297         } while (rc == -EINTR);
1298
1299         LASSERTF(oig->oig_pending == 0,
1300                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1301                  oig->oig_pending);
1302
1303         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1304         return oig->oig_rc;
1305 }
1306 EXPORT_SYMBOL(oig_wait);
1307
1308 void class_fail_export(struct obd_export *exp)
1309 {
1310         int rc, already_failed;
1311
1312         spin_lock(&exp->exp_lock);
1313         already_failed = exp->exp_failed;
1314         exp->exp_failed = 1;
1315         spin_unlock(&exp->exp_lock);
1316
1317         if (already_failed) {
1318                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1319                        exp, exp->exp_client_uuid.uuid);
1320                 return;
1321         }
1322
1323         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1324                exp, exp->exp_client_uuid.uuid);
1325
1326         if (obd_dump_on_timeout)
1327                 libcfs_debug_dumplog();
1328
1329         /* Most callers into obd_disconnect are removing their own reference
1330          * (request, for example) in addition to the one from the hash table.
1331          * We don't have such a reference here, so make one. */
1332         class_export_get(exp);
1333         rc = obd_disconnect(exp);
1334         if (rc)
1335                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1336         else
1337                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1338                        exp, exp->exp_client_uuid.uuid);
1339 }
1340 EXPORT_SYMBOL(class_fail_export);
1341
1342 char *obd_export_nid2str(struct obd_export *exp)
1343 {
1344         if (exp->exp_connection != NULL)
1345                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1346
1347         return "(no nid)";
1348 }
1349 EXPORT_SYMBOL(obd_export_nid2str);
1350
1351 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1352 {
1353         struct obd_export *doomed_exp = NULL;
1354         int exports_evicted = 0;
1355
1356         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1357
1358         do {
1359                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1360                 if (doomed_exp == NULL)
1361                         break;
1362
1363                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1364                          "nid %s found, wanted nid %s, requested nid %s\n",
1365                          obd_export_nid2str(doomed_exp),
1366                          libcfs_nid2str(nid_key), nid);
1367                 LASSERTF(doomed_exp != obd->obd_self_export,
1368                          "self-export is hashed by NID?\n");
1369                 exports_evicted++;
1370                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1371                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1372                        exports_evicted);
1373                 class_fail_export(doomed_exp);
1374                 class_export_put(doomed_exp);
1375         } while (1);
1376
1377         if (!exports_evicted)
1378                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1379                        obd->obd_name, nid);
1380         return exports_evicted;
1381 }
1382 EXPORT_SYMBOL(obd_export_evict_by_nid);
1383
1384 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1385 {
1386         struct obd_export *doomed_exp = NULL;
1387         struct obd_uuid doomed_uuid;
1388         int exports_evicted = 0;
1389
1390         obd_str2uuid(&doomed_uuid, uuid);
1391         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1392                 CERROR("%s: can't evict myself\n", obd->obd_name);
1393                 return exports_evicted;
1394         }
1395
1396         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1397
1398         if (doomed_exp == NULL) {
1399                 CERROR("%s: can't disconnect %s: no exports found\n",
1400                        obd->obd_name, uuid);
1401         } else {
1402                 CWARN("%s: evicting %s at adminstrative request\n",
1403                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1404                 class_fail_export(doomed_exp);
1405                 class_export_put(doomed_exp);
1406                 exports_evicted++;
1407         }
1408
1409         return exports_evicted;
1410 }
1411 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1412
1413 /**
1414  * kill zombie imports and exports
1415  */
1416 void obd_zombie_impexp_cull(void)
1417 {
1418         struct obd_import *import;
1419         struct obd_export *export;
1420         ENTRY;
1421
1422         do {
1423                 spin_lock (&obd_zombie_impexp_lock);
1424
1425                 import = NULL;
1426                 if (!list_empty(&obd_zombie_imports)) {
1427                         import = list_entry(obd_zombie_imports.next,
1428                                             struct obd_import,
1429                                             imp_zombie_chain);
1430                         list_del(&import->imp_zombie_chain);
1431                 }
1432
1433                 export = NULL;
1434                 if (!list_empty(&obd_zombie_exports)) {
1435                         export = list_entry(obd_zombie_exports.next,
1436                                             struct obd_export,
1437                                             exp_obd_chain);
1438                         list_del_init(&export->exp_obd_chain);
1439                 }
1440
1441                 spin_unlock(&obd_zombie_impexp_lock);
1442
1443                 if (import != NULL)
1444                         class_import_destroy(import);
1445
1446                 if (export != NULL)
1447                         class_export_destroy(export);
1448
1449         } while (import != NULL || export != NULL);
1450         EXIT;
1451 }
1452
1453 static struct completion        obd_zombie_start;
1454 static struct completion        obd_zombie_stop;
1455 static unsigned long            obd_zombie_flags;
1456 static cfs_waitq_t              obd_zombie_waitq;
1457
1458 enum {
1459         OBD_ZOMBIE_STOP = 1
1460 };
1461
1462 /**
1463  * check for work for kill zombie import/export thread.
1464  */
1465 int obd_zombie_impexp_check(void *arg)
1466 {
1467         int rc;
1468
1469         spin_lock(&obd_zombie_impexp_lock);
1470         rc = list_empty(&obd_zombie_imports) &&
1471              list_empty(&obd_zombie_exports) &&
1472              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1473
1474         spin_unlock(&obd_zombie_impexp_lock);
1475
1476         RETURN(rc);
1477 }
1478
1479 /**
1480  * notify import/export destroy thread about new zombie.
1481  */
1482 static void obd_zombie_impexp_notify(void)
1483 {
1484         cfs_waitq_signal(&obd_zombie_waitq);
1485 }
1486
1487 #ifdef __KERNEL__
1488
1489 /**
1490  * destroy zombie export/import thread.
1491  */
1492 static int obd_zombie_impexp_thread(void *unused)
1493 {
1494         int rc;
1495
1496         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1497                 complete(&obd_zombie_start);
1498                 RETURN(rc);
1499         }
1500
1501         complete(&obd_zombie_start);
1502
1503         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1504                 struct l_wait_info lwi = { 0 };
1505
1506                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1507
1508                 obd_zombie_impexp_cull();
1509         }
1510
1511         complete(&obd_zombie_stop);
1512
1513         RETURN(0);
1514 }
1515
1516 #else /* ! KERNEL */
1517
1518 static atomic_t zombie_recur = ATOMIC_INIT(0);
1519 static void *obd_zombie_impexp_work_cb;
1520 static void *obd_zombie_impexp_idle_cb;
1521
1522 int obd_zombie_impexp_kill(void *arg)
1523 {
1524         int rc = 0;
1525
1526         if (atomic_inc_return(&zombie_recur) == 1) {
1527                 obd_zombie_impexp_cull();
1528                 rc = 1;
1529         }
1530         atomic_dec(&zombie_recur);
1531         return rc;
1532 }
1533
1534 #endif
1535
1536 /**
1537  * start destroy zombie import/export thread
1538  */
1539 int obd_zombie_impexp_init(void)
1540 {
1541         int rc;
1542
1543         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1544         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1545         spin_lock_init(&obd_zombie_impexp_lock);
1546         init_completion(&obd_zombie_start);
1547         init_completion(&obd_zombie_stop);
1548         cfs_waitq_init(&obd_zombie_waitq);
1549
1550 #ifdef __KERNEL__
1551         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1552         if (rc < 0)
1553                 RETURN(rc);
1554
1555         wait_for_completion(&obd_zombie_start);
1556 #else
1557
1558         obd_zombie_impexp_work_cb =
1559                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1560                                                  &obd_zombie_impexp_kill, NULL);
1561
1562         obd_zombie_impexp_idle_cb =
1563                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1564                                                  &obd_zombie_impexp_check, NULL);
1565         rc = 0;
1566
1567 #endif
1568         RETURN(rc);
1569 }
1570 /**
1571  * stop destroy zombie import/export thread
1572  */
1573 void obd_zombie_impexp_stop(void)
1574 {
1575         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1576         obd_zombie_impexp_notify();
1577 #ifdef __KERNEL__
1578         wait_for_completion(&obd_zombie_stop);
1579 #else
1580         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1581         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1582 #endif
1583 }