Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50 #include <lustre_export.h>
51
52 extern struct list_head obd_types;
53 spinlock_t obd_types_lock;
54
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
59
60 struct list_head  obd_zombie_imports;
61 struct list_head  obd_zombie_exports;
62 spinlock_t        obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66
67 /*
68  * support functions: we could use inter-module communication, but this
69  * is more portable to other OS's
70  */
71 static struct obd_device *obd_device_alloc(void)
72 {
73         struct obd_device *obd;
74
75         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76         if (obd != NULL) {
77                 obd->obd_magic = OBD_DEVICE_MAGIC;
78         }
79         return obd;
80 }
81 EXPORT_SYMBOL(obd_device_alloc);
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic "
87                  "%08x != %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up "
90                        "(obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96 EXPORT_SYMBOL(obd_device_free);
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef CONFIG_KMOD
120         if (!type) {
121                 const char *modname = name;
122                 if (strcmp(modname, LUSTRE_MDT_NAME) == 0)
123                         modname = LUSTRE_MDS_NAME;
124                 if (!request_module(modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 try_module_get(type->typ_ops->o_owner);
137                 spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141
142 void class_put_type(struct obd_type *type)
143 {
144         LASSERT(type);
145         spin_lock(&type->obd_type_lock);
146         type->typ_refcnt--;
147         module_put(type->typ_ops->o_owner);
148         spin_unlock(&type->obd_type_lock);
149 }
150
151 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
152                         const char *name)
153 {
154         struct obd_type *type;
155         int rc = 0;
156         ENTRY;
157
158         LASSERT(strnlen(name, 1024) < 1024);    /* sanity check */
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
171         OBD_ALLOC(type->typ_name, strlen(name) + 1);
172         if (type->typ_ops == NULL || type->typ_name == NULL)
173                 GOTO (failed, rc);
174
175         *(type->typ_ops) = *ops;
176         strcpy(type->typ_name, name);
177         spin_lock_init(&type->obd_type_lock);
178
179 #ifdef LPROCFS
180         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
181                                               vars, type);
182         if (IS_ERR(type->typ_procroot)) {
183                 rc = PTR_ERR(type->typ_procroot);
184                 type->typ_procroot = NULL;
185                 GOTO (failed, rc);
186         }
187 #endif
188
189         spin_lock(&obd_types_lock);
190         list_add(&type->typ_chain, &obd_types);
191         spin_unlock(&obd_types_lock);
192
193         RETURN (0);
194
195  failed:
196         if (type->typ_name != NULL)
197                 OBD_FREE(type->typ_name, strlen(name) + 1);
198         if (type->typ_ops != NULL)
199                 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
200         OBD_FREE(type, sizeof(*type));
201         RETURN(rc);
202 }
203
204 int class_unregister_type(const char *name)
205 {
206         struct obd_type *type = class_search_type(name);
207         ENTRY;
208
209         if (!type) {
210                 CERROR("unknown obd type\n");
211                 RETURN(-EINVAL);
212         }
213
214         if (type->typ_refcnt) {
215                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
216                 /* This is a bad situation, let's make the best of it */
217                 /* Remove ops, but leave the name for debugging */
218                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
219                 RETURN(-EBUSY);
220         }
221
222         if (type->typ_procroot)
223                 lprocfs_remove(&type->typ_procroot);
224
225         spin_lock(&obd_types_lock);
226         list_del(&type->typ_chain);
227         spin_unlock(&obd_types_lock);
228         OBD_FREE(type->typ_name, strlen(name) + 1);
229         if (type->typ_ops != NULL)
230                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
231         OBD_FREE(type, sizeof(*type));
232         RETURN(0);
233 } /* class_unregister_type */
234
235 /**
236  * Create a new obd device.
237  *
238  * Find an empty slot in ::obd_devs[], create a new obd device in it.
239  *
240  * \param typename [in] obd device type string.
241  * \param name     [in] obd device name.
242  *
243  * \retval NULL if create fails, otherwise return the obd device
244  *         pointer created.
245  */
246 struct obd_device *class_newdev(const char *type_name, const char *name)
247 {
248         struct obd_device *result = NULL;
249         struct obd_device *newdev;
250         struct obd_type *type = NULL;
251         int i;
252         int new_obd_minor = 0;
253
254         if (strlen(name) >= MAX_OBD_NAME) {
255                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
256                 RETURN(ERR_PTR(-EINVAL));
257         }
258
259         type = class_get_type(type_name);
260         if (type == NULL){
261                 CERROR("OBD: unknown type: %s\n", type_name);
262                 RETURN(ERR_PTR(-ENODEV));
263         }
264
265         newdev = obd_device_alloc();
266         if (newdev == NULL) {
267                 class_put_type(type);
268                 RETURN(ERR_PTR(-ENOMEM));
269         }
270         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
271
272         spin_lock(&obd_dev_lock);
273         for (i = 0; i < class_devno_max(); i++) {
274                 struct obd_device *obd = class_num2obd(i);
275                 if (obd && obd->obd_name && (strcmp(name, obd->obd_name) == 0)){
276                         CERROR("Device %s already exists, won't add\n", name);
277                         if (result) {
278                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
279                                          "%p obd_magic %08x != %08x\n", result,
280                                          result->obd_magic, OBD_DEVICE_MAGIC);
281                                 LASSERTF(result->obd_minor == new_obd_minor,
282                                          "%p obd_minor %d != %d\n", result,
283                                          result->obd_minor, new_obd_minor);
284
285                                 obd_devs[result->obd_minor] = NULL;
286                                 result->obd_name[0]='\0';
287                         }
288                         result = ERR_PTR(-EEXIST);
289                         break;
290                 }
291                 if (!result && !obd) {
292                         result = newdev;
293                         result->obd_minor = i;
294                         new_obd_minor = i;
295                         result->obd_type = type;
296                         strncpy(result->obd_name, name,
297                                 sizeof(result->obd_name) - 1);
298                         obd_devs[i] = result;
299                 }
300         }
301         spin_unlock(&obd_dev_lock);
302
303         if (result == NULL && i >= class_devno_max()) {
304                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
305                        class_devno_max());
306                 result = ERR_PTR(-EOVERFLOW);
307         }
308
309         if (IS_ERR(result)) {
310                 obd_device_free(newdev);
311                 class_put_type(type);
312         } else {
313                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
314                        result->obd_name, result);
315         }
316         return result;
317 }
318
319 void class_release_dev(struct obd_device *obd)
320 {
321         struct obd_type *obd_type = obd->obd_type;
322
323         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
324                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
325         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
326                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
327         LASSERT(obd_type != NULL);
328
329         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
330                obd->obd_name,obd->obd_type->typ_name);
331
332         spin_lock(&obd_dev_lock);
333         obd_devs[obd->obd_minor] = NULL;
334         spin_unlock(&obd_dev_lock);
335         obd_device_free(obd);
336
337         class_put_type(obd_type);
338 }
339
340 int class_name2dev(const char *name)
341 {
342         int i;
343
344         if (!name)
345                 return -1;
346
347         spin_lock(&obd_dev_lock);
348         for (i = 0; i < class_devno_max(); i++) {
349                 struct obd_device *obd = class_num2obd(i);
350                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
351                         /* Make sure we finished attaching before we give
352                            out any references */
353                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
354                         if (obd->obd_attached) {
355                                 spin_unlock(&obd_dev_lock);
356                                 return i;
357                         }
358                         break;
359                 }
360         }
361         spin_unlock(&obd_dev_lock);
362
363         return -1;
364 }
365
366 struct obd_device *class_name2obd(const char *name)
367 {
368         int dev = class_name2dev(name);
369
370         if (dev < 0 || dev > class_devno_max())
371                 return NULL;
372         return class_num2obd(dev);
373 }
374
375 int class_uuid2dev(struct obd_uuid *uuid)
376 {
377         int i;
378
379         spin_lock(&obd_dev_lock);
380         for (i = 0; i < class_devno_max(); i++) {
381                 struct obd_device *obd = class_num2obd(i);
382                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
383                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
384                         spin_unlock(&obd_dev_lock);
385                         return i;
386                 }
387         }
388         spin_unlock(&obd_dev_lock);
389
390         return -1;
391 }
392
393 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
394 {
395         int dev = class_uuid2dev(uuid);
396         if (dev < 0)
397                 return NULL;
398         return class_num2obd(dev);
399 }
400
401 /**
402  * Get obd device from ::obd_devs[]
403  *
404  * \param num [in] array index
405  *
406  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
407  *         otherwise return the obd device there.
408  */
409 struct obd_device *class_num2obd(int num)
410 {
411         struct obd_device *obd = NULL;
412
413         if (num < class_devno_max()) {
414                 obd = obd_devs[num];
415                 if (obd == NULL)
416                         return NULL;
417
418                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
419                          "%p obd_magic %08x != %08x\n",
420                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
421                 LASSERTF(obd->obd_minor == num,
422                          "%p obd_minor %0d != %0d\n",
423                          obd, obd->obd_minor, num);
424         }
425
426         return obd;
427 }
428
429 void class_obd_list(void)
430 {
431         char *status;
432         int i;
433
434         spin_lock(&obd_dev_lock);
435         for (i = 0; i < class_devno_max(); i++) {
436                 struct obd_device *obd = class_num2obd(i);
437                 if (obd == NULL)
438                         continue;
439                 if (obd->obd_stopping)
440                         status = "ST";
441                 else if (obd->obd_set_up)
442                         status = "UP";
443                 else if (obd->obd_attached)
444                         status = "AT";
445                 else
446                         status = "--";
447                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
448                          i, status, obd->obd_type->typ_name,
449                          obd->obd_name, obd->obd_uuid.uuid,
450                          atomic_read(&obd->obd_refcount));
451         }
452         spin_unlock(&obd_dev_lock);
453         return;
454 }
455
456 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
457    specified, then only the client with that uuid is returned,
458    otherwise any client connected to the tgt is returned. */
459 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
460                                           const char * typ_name,
461                                           struct obd_uuid *grp_uuid)
462 {
463         int i;
464
465         spin_lock(&obd_dev_lock);
466         for (i = 0; i < class_devno_max(); i++) {
467                 struct obd_device *obd = class_num2obd(i);
468                 if (obd == NULL)
469                         continue;
470                 if ((strncmp(obd->obd_type->typ_name, typ_name,
471                              strlen(typ_name)) == 0)) {
472                         if (obd_uuid_equals(tgt_uuid,
473                                             &obd->u.cli.cl_target_uuid) &&
474                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
475                                                          &obd->obd_uuid) : 1)) {
476                                 spin_unlock(&obd_dev_lock);
477                                 return obd;
478                         }
479                 }
480         }
481         spin_unlock(&obd_dev_lock);
482
483         return NULL;
484 }
485
486 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
487                                             struct obd_uuid *grp_uuid)
488 {
489         struct obd_device *obd;
490
491         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
492         if (!obd)
493                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
494                                             grp_uuid);
495         return obd;
496 }
497
498 /* Iterate the obd_device list looking devices have grp_uuid. Start
499    searching at *next, and if a device is found, the next index to look
500    at is saved in *next. If next is NULL, then the first matching device
501    will always be returned. */
502 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
503 {
504         int i;
505
506         if (next == NULL)
507                 i = 0;
508         else if (*next >= 0 && *next < class_devno_max())
509                 i = *next;
510         else
511                 return NULL;
512
513         spin_lock(&obd_dev_lock);
514         for (; i < class_devno_max(); i++) {
515                 struct obd_device *obd = class_num2obd(i);
516                 if (obd == NULL)
517                         continue;
518                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
519                         if (next != NULL)
520                                 *next = i+1;
521                         spin_unlock(&obd_dev_lock);
522                         return obd;
523                 }
524         }
525         spin_unlock(&obd_dev_lock);
526
527         return NULL;
528 }
529
530
531 void obd_cleanup_caches(void)
532 {
533         int rc;
534
535         ENTRY;
536         if (obd_device_cachep) {
537                 rc = cfs_mem_cache_destroy(obd_device_cachep);
538                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
539                 obd_device_cachep = NULL;
540         }
541         if (obdo_cachep) {
542                 rc = cfs_mem_cache_destroy(obdo_cachep);
543                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
544                 obdo_cachep = NULL;
545         }
546         if (import_cachep) {
547                 rc = cfs_mem_cache_destroy(import_cachep);
548                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
549                 import_cachep = NULL;
550         }
551         EXIT;
552 }
553
554 int obd_init_caches(void)
555 {
556         ENTRY;
557
558         LASSERT(obd_device_cachep == NULL);
559         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
560                                                  sizeof(struct obd_device),
561                                                  0, 0);
562         if (!obd_device_cachep)
563                 GOTO(out, -ENOMEM);
564
565         LASSERT(obdo_cachep == NULL);
566         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
567                                            0, 0);
568         if (!obdo_cachep)
569                 GOTO(out, -ENOMEM);
570
571         LASSERT(import_cachep == NULL);
572         import_cachep = cfs_mem_cache_create("ll_import_cache",
573                                              sizeof(struct obd_import),
574                                              0, 0);
575         if (!import_cachep)
576                 GOTO(out, -ENOMEM);
577
578         RETURN(0);
579  out:
580         obd_cleanup_caches();
581         RETURN(-ENOMEM);
582
583 }
584
585 /* map connection to client */
586 struct obd_export *class_conn2export(struct lustre_handle *conn)
587 {
588         struct obd_export *export;
589         ENTRY;
590
591         if (!conn) {
592                 CDEBUG(D_CACHE, "looking for null handle\n");
593                 RETURN(NULL);
594         }
595
596         if (conn->cookie == -1) {  /* this means assign a new connection */
597                 CDEBUG(D_CACHE, "want a new connection\n");
598                 RETURN(NULL);
599         }
600
601         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
602         export = class_handle2object(conn->cookie);
603         RETURN(export);
604 }
605
606 struct obd_device *class_exp2obd(struct obd_export *exp)
607 {
608         if (exp)
609                 return exp->exp_obd;
610         return NULL;
611 }
612
613 struct obd_device *class_conn2obd(struct lustre_handle *conn)
614 {
615         struct obd_export *export;
616         export = class_conn2export(conn);
617         if (export) {
618                 struct obd_device *obd = export->exp_obd;
619                 class_export_put(export);
620                 return obd;
621         }
622         return NULL;
623 }
624
625 struct obd_import *class_exp2cliimp(struct obd_export *exp)
626 {
627         struct obd_device *obd = exp->exp_obd;
628         if (obd == NULL)
629                 return NULL;
630         return obd->u.cli.cl_import;
631 }
632
633 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
634 {
635         struct obd_device *obd = class_conn2obd(conn);
636         if (obd == NULL)
637                 return NULL;
638         return obd->u.cli.cl_import;
639 }
640
641 /* Export management functions */
642 static void export_handle_addref(void *export)
643 {
644         class_export_get(export);
645 }
646
647 /* called from mds_commit_cb() in context of journal commit callback
648  * and cannot call any blocking functions. */
649 void __class_export_put(struct obd_export *exp)
650 {
651         if (atomic_dec_and_test(&exp->exp_refcount)) {
652                 LASSERT (list_empty(&exp->exp_obd_chain));
653
654                 CDEBUG(D_IOCTL, "final put %p/%s\n",
655                        exp, exp->exp_client_uuid.uuid);
656
657                 spin_lock(&obd_zombie_impexp_lock);
658                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
659                 spin_unlock(&obd_zombie_impexp_lock);
660
661                 obd_zombie_impexp_notify();
662         }
663 }
664 EXPORT_SYMBOL(__class_export_put);
665
666 void class_export_destroy(struct obd_export *exp)
667 {
668         struct obd_device *obd = exp->exp_obd;
669
670         LASSERT (atomic_read(&exp->exp_refcount) == 0);
671
672         CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
673                exp->exp_client_uuid.uuid);
674
675         LASSERT(obd != NULL);
676
677         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
678         if (exp->exp_connection)
679                 ptlrpc_put_connection_superhack(exp->exp_connection);
680
681         LASSERT(list_empty(&exp->exp_outstanding_replies));
682         LASSERT(list_empty(&exp->exp_uncommitted_replies));
683         LASSERT(list_empty(&exp->exp_req_replay_queue));
684         LASSERT(list_empty(&exp->exp_queued_rpc));
685         obd_destroy_export(exp);
686
687         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
688         class_decref(obd);
689 }
690
691 /* Creates a new export, adds it to the hash table, and returns a
692  * pointer to it. The refcount is 2: one for the hash reference, and
693  * one for the pointer returned by this function. */
694 struct obd_export *class_new_export(struct obd_device *obd,
695                                     struct obd_uuid *cluuid)
696 {
697         struct obd_export *export;
698         int rc = 0;
699
700         OBD_ALLOC(export, sizeof(*export));
701         if (!export)
702                 return ERR_PTR(-ENOMEM);
703
704         export->exp_conn_cnt = 0;
705         export->exp_lock_hash = NULL;
706         atomic_set(&export->exp_refcount, 2);
707         atomic_set(&export->exp_rpc_count, 0);
708         export->exp_obd = obd;
709         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
710         spin_lock_init(&export->exp_uncommitted_replies_lock);
711         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
712         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
713         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
714
715         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
716         class_handle_hash(&export->exp_handle, export_handle_addref);
717         export->exp_last_request_time = cfs_time_current_sec();
718         spin_lock_init(&export->exp_lock);
719         INIT_HLIST_NODE(&export->exp_uuid_hash);
720         INIT_HLIST_NODE(&export->exp_nid_hash);
721
722         export->exp_client_uuid = *cluuid;
723         obd_init_export(export);
724
725         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
726                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
727                                             &export->exp_uuid_hash);
728                 if (rc != 0) {
729                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
730                                       obd->obd_name, cluuid->uuid, rc);
731                         class_handle_unhash(&export->exp_handle);
732                         OBD_FREE_PTR(export);
733                         return ERR_PTR(-EALREADY);
734                 }
735         }
736
737         spin_lock(&obd->obd_dev_lock);
738         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
739         class_incref(obd);
740         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
741         list_add_tail(&export->exp_obd_chain_timed,
742                       &export->exp_obd->obd_exports_timed);
743         export->exp_obd->obd_num_exports++;
744         spin_unlock(&obd->obd_dev_lock);
745
746         return export;
747 }
748 EXPORT_SYMBOL(class_new_export);
749
750 void class_unlink_export(struct obd_export *exp)
751 {
752         class_handle_unhash(&exp->exp_handle);
753
754         spin_lock(&exp->exp_obd->obd_dev_lock);
755         /* delete an uuid-export hashitem from hashtables */
756         if (!hlist_unhashed(&exp->exp_uuid_hash))
757                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
758                                 &exp->exp_client_uuid,
759                                 &exp->exp_uuid_hash);
760
761         list_del_init(&exp->exp_obd_chain);
762         list_del_init(&exp->exp_obd_chain_timed);
763         exp->exp_obd->obd_num_exports--;
764         spin_unlock(&exp->exp_obd->obd_dev_lock);
765         /* Keep these counter valid always */
766         spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
767         if (exp->exp_delayed)
768                 exp->exp_obd->obd_delayed_clients--;
769         else if (exp->exp_replay_needed)
770                 exp->exp_obd->obd_recoverable_clients--;
771         spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
772         class_export_put(exp);
773 }
774 EXPORT_SYMBOL(class_unlink_export);
775
776 /* Import management functions */
777 static void import_handle_addref(void *import)
778 {
779         class_import_get(import);
780 }
781
782 struct obd_import *class_import_get(struct obd_import *import)
783 {
784         LASSERT(atomic_read(&import->imp_refcount) >= 0);
785         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
786         atomic_inc(&import->imp_refcount);
787         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
788                atomic_read(&import->imp_refcount), 
789                import->imp_obd->obd_name);
790         return import;
791 }
792 EXPORT_SYMBOL(class_import_get);
793
794 void class_import_put(struct obd_import *import)
795 {
796         ENTRY;
797
798         LASSERT(atomic_read(&import->imp_refcount) > 0);
799         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
800         LASSERT(list_empty(&import->imp_zombie_chain));
801
802         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
803                atomic_read(&import->imp_refcount) - 1, 
804                import->imp_obd->obd_name);
805
806         if (atomic_dec_and_test(&import->imp_refcount)) {
807                 CDEBUG(D_INFO, "final put import %p\n", import);
808                 spin_lock(&obd_zombie_impexp_lock);
809                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
810                 spin_unlock(&obd_zombie_impexp_lock);
811
812                 obd_zombie_impexp_notify();
813         }
814
815         EXIT;
816 }
817 EXPORT_SYMBOL(class_import_put);
818
819 void class_import_destroy(struct obd_import *import)
820 {
821         ENTRY;
822
823         CDEBUG(D_IOCTL, "destroying import %p\n", import);
824
825         LASSERT(atomic_read(&import->imp_refcount) == 0);
826
827         ptlrpc_put_connection_superhack(import->imp_connection);
828
829         while (!list_empty(&import->imp_conn_list)) {
830                 struct obd_import_conn *imp_conn;
831
832                 imp_conn = list_entry(import->imp_conn_list.next,
833                                       struct obd_import_conn, oic_item);
834                 list_del(&imp_conn->oic_item);
835                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
836                 OBD_FREE(imp_conn, sizeof(*imp_conn));
837         }
838
839         class_decref(import->imp_obd);
840         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
841         EXIT;
842 }
843
844 static void init_imp_at(struct imp_at *at) {
845         int i;
846         at_init(&at->iat_net_latency, 0, 0);
847         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
848                 /* max service estimates are tracked on the server side, so
849                    don't use the AT history here, just use the last reported
850                    val. (But keep hist for proc histogram, worst_ever) */
851                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
852                         AT_FLG_NOHIST);
853         }
854 }
855
856 struct obd_import *class_new_import(struct obd_device *obd)
857 {
858         struct obd_import *imp;
859
860         OBD_ALLOC(imp, sizeof(*imp));
861         if (imp == NULL)
862                 return NULL;
863
864         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
865         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
866         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
867         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
868         spin_lock_init(&imp->imp_lock);
869         imp->imp_last_success_conn = 0;
870         imp->imp_state = LUSTRE_IMP_NEW;
871         imp->imp_obd = class_incref(obd);
872         cfs_waitq_init(&imp->imp_recovery_waitq);
873
874         atomic_set(&imp->imp_refcount, 2);
875         atomic_set(&imp->imp_unregistering, 0);
876         atomic_set(&imp->imp_inflight, 0);
877         atomic_set(&imp->imp_replay_inflight, 0);
878         atomic_set(&imp->imp_inval_count, 0);
879         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
880         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
881         class_handle_hash(&imp->imp_handle, import_handle_addref);
882         init_imp_at(&imp->imp_at);
883
884 /* b1_8 supports both v1 & v2. but HEAD only supports v2.
885  * So let's use v2.
886  */
887 #define HAVE_DEFAULT_V2_CONNECT 1
888 #ifdef HAVE_DEFAULT_V2_CONNECT
889         /* the default magic is V2, will be used in connect RPC, and
890          * then adjusted according to the flags in request/reply. */
891         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
892 #else
893         /* the default magic is V1, will be used in connect RPC, and
894          * then adjusted according to the flags in request/reply. */
895         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V1;
896 #endif
897
898         return imp;
899 }
900 EXPORT_SYMBOL(class_new_import);
901
902 void class_destroy_import(struct obd_import *import)
903 {
904         LASSERT(import != NULL);
905         LASSERT(import != LP_POISON);
906
907         class_handle_unhash(&import->imp_handle);
908
909         spin_lock(&import->imp_lock);
910         import->imp_generation++;
911         spin_unlock(&import->imp_lock);
912
913         class_import_put(import);
914 }
915 EXPORT_SYMBOL(class_destroy_import);
916
917 /* A connection defines an export context in which preallocation can
918    be managed. This releases the export pointer reference, and returns
919    the export handle, so the export refcount is 1 when this function
920    returns. */
921 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
922                   struct obd_uuid *cluuid)
923 {
924         struct obd_export *export;
925         LASSERT(conn != NULL);
926         LASSERT(obd != NULL);
927         LASSERT(cluuid != NULL);
928         ENTRY;
929
930         export = class_new_export(obd, cluuid);
931         if (IS_ERR(export))
932                 RETURN(PTR_ERR(export));
933
934         conn->cookie = export->exp_handle.h_cookie;
935         class_export_put(export);
936
937         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
938                cluuid->uuid, conn->cookie);
939         RETURN(0);
940 }
941 EXPORT_SYMBOL(class_connect);
942
943 /* This function removes 1-3 references from the export:
944  * 1 - for export pointer passed
945  * and if disconnect really need
946  * 2 - removing from hash
947  * 3 - in client_unlink_export
948  * The export pointer passed to this function can destroyed */
949 int class_disconnect(struct obd_export *export)
950 {
951         int already_disconnected;
952         ENTRY;
953
954         if (export == NULL) {
955                 fixme();
956                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
957                 RETURN(-EINVAL);
958         }
959
960         spin_lock(&export->exp_lock);
961         already_disconnected = export->exp_disconnected;
962         export->exp_disconnected = 1;
963         spin_unlock(&export->exp_lock);
964
965
966         /* class_cleanup(), abort_recovery(), and class_fail_export()
967          * all end up in here, and if any of them race we shouldn't
968          * call extra class_export_puts(). */
969         if (already_disconnected)
970                 GOTO(no_disconn, already_disconnected);
971
972         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
973                export->exp_handle.h_cookie);
974
975
976         if (!hlist_unhashed(&export->exp_nid_hash))
977                 lustre_hash_del(export->exp_obd->obd_nid_hash,
978                                 &export->exp_connection->c_peer.nid,
979                                 &export->exp_nid_hash);
980
981         class_unlink_export(export);
982
983 no_disconn:
984         class_export_put(export);
985         RETURN(0);
986 }
987
988 static void class_disconnect_export_list(struct list_head *list,
989                                          enum obd_option flags)
990 {
991         int rc;
992         struct obd_export *exp;
993         ENTRY;
994
995         /* It's possible that an export may disconnect itself, but
996          * nothing else will be added to this list. */
997         while (!list_empty(list)) {
998                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
999                 /* need for safe call CDEBUG after obd_disconnect */
1000                 class_export_get(exp);
1001
1002                 spin_lock(&exp->exp_lock);
1003                 exp->exp_flags = flags;
1004                 spin_unlock(&exp->exp_lock);
1005
1006                 if (obd_uuid_equals(&exp->exp_client_uuid,
1007                                     &exp->exp_obd->obd_uuid)) {
1008                         CDEBUG(D_HA,
1009                                "exp %p export uuid == obd uuid, don't discon\n",
1010                                exp);
1011                         /* Need to delete this now so we don't end up pointing
1012                          * to work_list later when this export is cleaned up. */
1013                         list_del_init(&exp->exp_obd_chain);
1014                         class_export_put(exp);
1015                         continue;
1016                 }
1017
1018                 class_export_get(exp);
1019                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1020                        "last request at %ld\n",
1021                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1022                        exp, exp->exp_last_request_time);
1023
1024                 /* release one export reference anyway */
1025                 rc = obd_disconnect(exp);
1026                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1027                        obd_export_nid2str(exp), exp, rc);
1028                 class_export_put(exp);
1029         }
1030         EXIT;
1031 }
1032
1033 void class_disconnect_exports(struct obd_device *obd)
1034 {
1035         struct list_head work_list;
1036         ENTRY;
1037
1038         /* Move all of the exports from obd_exports to a work list, en masse. */
1039         CFS_INIT_LIST_HEAD(&work_list);
1040         spin_lock(&obd->obd_dev_lock);
1041         list_splice_init(&obd->obd_delayed_exports, &work_list);
1042         list_splice_init(&obd->obd_exports, &work_list);
1043         spin_unlock(&obd->obd_dev_lock);
1044
1045         CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1046                "disconnecting them\n", obd->obd_minor, obd);
1047         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd));
1048         EXIT;
1049 }
1050 EXPORT_SYMBOL(class_disconnect_exports);
1051
1052 /* Remove exports that have not completed recovery. */
1053 void class_disconnect_stale_exports(struct obd_device *obd,
1054                                     enum obd_option flags)
1055 {
1056         struct list_head work_list;
1057         struct list_head *pos, *n;
1058         struct obd_export *exp;
1059         ENTRY;
1060
1061         CFS_INIT_LIST_HEAD(&work_list);
1062         spin_lock(&obd->obd_dev_lock);
1063         list_for_each_safe(pos, n, &obd->obd_exports) {
1064                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1065                 if (exp->exp_replay_needed) {
1066                         list_move(&exp->exp_obd_chain, &work_list);
1067                         obd->obd_stale_clients++;
1068                 }
1069         }
1070         spin_unlock(&obd->obd_dev_lock);
1071
1072         CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1073                obd->obd_name, obd->obd_stale_clients);
1074         class_disconnect_export_list(&work_list, flags);
1075         EXIT;
1076 }
1077 EXPORT_SYMBOL(class_disconnect_stale_exports);
1078
1079 void class_disconnect_expired_exports(struct obd_device *obd)
1080 {
1081         struct list_head expired_list;
1082         struct obd_export *exp, *n;
1083         int cnt = 0;
1084         ENTRY;
1085
1086         CFS_INIT_LIST_HEAD(&expired_list);
1087         spin_lock(&obd->obd_dev_lock);
1088         list_for_each_entry_safe(exp, n, &obd->obd_delayed_exports,
1089                                  exp_obd_chain) {
1090                 if (exp_expired(exp, obd->u.obt.obt_stale_export_age)) {
1091                         list_move(&exp->exp_obd_chain, &expired_list);
1092                         cnt++;
1093                 }
1094         }
1095         spin_unlock(&obd->obd_dev_lock);
1096
1097         if (cnt == 0)
1098                 return;
1099
1100         CDEBUG(D_INFO, "%s: disconnecting %d expired exports\n",
1101                obd->obd_name, cnt);
1102         class_disconnect_export_list(&expired_list, exp_flags_from_obd(obd));
1103
1104         EXIT;
1105 }
1106 EXPORT_SYMBOL(class_disconnect_expired_exports);
1107
1108 void class_set_export_delayed(struct obd_export *exp)
1109 {
1110         struct obd_device *obd = class_exp2obd(exp);
1111
1112         LASSERT(!exp->exp_delayed);
1113         spin_lock(&exp->exp_lock);
1114         exp->exp_delayed = 1;
1115         spin_unlock(&exp->exp_lock);
1116
1117         /* no need to ping delayed exports */
1118         spin_lock(&obd->obd_dev_lock);
1119         list_del_init(&exp->exp_obd_chain_timed);
1120         list_move_tail(&exp->exp_obd_chain, &obd->obd_delayed_exports);
1121         spin_unlock(&obd->obd_dev_lock);
1122
1123         LASSERT(obd->obd_recoverable_clients > 0);
1124
1125         spin_lock_bh(&obd->obd_processing_task_lock);
1126         obd->obd_delayed_clients++;
1127         obd->obd_recoverable_clients--;
1128         spin_unlock_bh(&obd->obd_processing_task_lock);
1129
1130         CDEBUG(D_HA, "%s: set client %s as delayed\n",
1131                obd->obd_name, exp->exp_client_uuid.uuid);
1132 }
1133 EXPORT_SYMBOL(class_set_export_delayed);
1134
1135 /*
1136  * Manage exports that have not completed recovery.
1137  */
1138 void class_handle_stale_exports(struct obd_device *obd)
1139 {
1140         struct list_head delay_list, evict_list;
1141         struct obd_export *exp, *n;
1142         int delayed = 0;
1143         ENTRY;
1144
1145         CFS_INIT_LIST_HEAD(&delay_list);
1146         CFS_INIT_LIST_HEAD(&evict_list);
1147         spin_lock(&obd->obd_dev_lock);
1148         list_for_each_entry_safe(exp, n, &obd->obd_exports, exp_obd_chain) {
1149                 LASSERT(!exp->exp_delayed);
1150                 /* clients finished recovery */
1151                 if (!exp->exp_replay_needed)
1152                         continue;
1153                 /* connected non-vbr clients are evicted */
1154                 if (exp->exp_in_recovery && !exp_connect_vbr(exp)) {
1155                         obd->obd_stale_clients++;
1156                         list_move_tail(&exp->exp_obd_chain, &evict_list);
1157                         continue;
1158                 }
1159                 if (obd->obd_version_recov || !exp->exp_in_recovery) {
1160                         list_move_tail(&exp->exp_obd_chain, &delay_list);
1161                         delayed++;
1162                 }
1163         }
1164 #ifndef HAVE_DELAYED_RECOVERY
1165         /* delayed recovery is turned off, evict all delayed exports */
1166         list_splice_init(&delay_list, &evict_list);
1167         list_splice_init(&obd->obd_delayed_exports, &evict_list);
1168         obd->obd_stale_clients += delayed;
1169 #endif
1170         spin_unlock(&obd->obd_dev_lock);
1171
1172         list_for_each_entry_safe(exp, n, &delay_list, exp_obd_chain) {
1173                 class_set_export_delayed(exp);
1174                 exp->exp_last_request_time = cfs_time_current_sec();
1175         }
1176         LASSERT(list_empty(&delay_list));
1177
1178         /* evict clients without VBR support */
1179         class_disconnect_export_list(&evict_list, exp_flags_from_obd(obd));
1180
1181         EXIT;
1182 }
1183 EXPORT_SYMBOL(class_handle_stale_exports);
1184
1185 int oig_init(struct obd_io_group **oig_out)
1186 {
1187         struct obd_io_group *oig;
1188         ENTRY;
1189
1190         OBD_ALLOC(oig, sizeof(*oig));
1191         if (oig == NULL)
1192                 RETURN(-ENOMEM);
1193
1194         spin_lock_init(&oig->oig_lock);
1195         oig->oig_rc = 0;
1196         oig->oig_pending = 0;
1197         atomic_set(&oig->oig_refcount, 1);
1198         cfs_waitq_init(&oig->oig_waitq);
1199         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1200
1201         *oig_out = oig;
1202         RETURN(0);
1203 };
1204 EXPORT_SYMBOL(oig_init);
1205
1206 static inline void oig_grab(struct obd_io_group *oig)
1207 {
1208         atomic_inc(&oig->oig_refcount);
1209 }
1210
1211 void oig_release(struct obd_io_group *oig)
1212 {
1213         if (atomic_dec_and_test(&oig->oig_refcount))
1214                 OBD_FREE(oig, sizeof(*oig));
1215 }
1216 EXPORT_SYMBOL(oig_release);
1217
1218 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1219 {
1220         int rc = 0;
1221         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1222         spin_lock(&oig->oig_lock);
1223         if (oig->oig_rc) {
1224                 rc = oig->oig_rc;
1225         } else {
1226                 oig->oig_pending++;
1227                 if (occ != NULL)
1228                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1229         }
1230         spin_unlock(&oig->oig_lock);
1231         oig_grab(oig);
1232
1233         return rc;
1234 }
1235 EXPORT_SYMBOL(oig_add_one);
1236
1237 void oig_complete_one(struct obd_io_group *oig,
1238                       struct oig_callback_context *occ, int rc)
1239 {
1240         cfs_waitq_t *wake = NULL;
1241         int old_rc;
1242
1243         spin_lock(&oig->oig_lock);
1244
1245         if (occ != NULL)
1246                 list_del_init(&occ->occ_oig_item);
1247
1248         old_rc = oig->oig_rc;
1249         if (oig->oig_rc == 0 && rc != 0)
1250                 oig->oig_rc = rc;
1251
1252         if (--oig->oig_pending <= 0)
1253                 wake = &oig->oig_waitq;
1254
1255         spin_unlock(&oig->oig_lock);
1256
1257         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1258                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1259                         oig->oig_pending);
1260         if (wake)
1261                 cfs_waitq_signal(wake);
1262         oig_release(oig);
1263 }
1264 EXPORT_SYMBOL(oig_complete_one);
1265
1266 static int oig_done(struct obd_io_group *oig)
1267 {
1268         int rc = 0;
1269         spin_lock(&oig->oig_lock);
1270         if (oig->oig_pending <= 0)
1271                 rc = 1;
1272         spin_unlock(&oig->oig_lock);
1273         return rc;
1274 }
1275
1276 static void interrupted_oig(void *data)
1277 {
1278         struct obd_io_group *oig = data;
1279         struct oig_callback_context *occ;
1280
1281         spin_lock(&oig->oig_lock);
1282         /* We need to restart the processing each time we drop the lock, as
1283          * it is possible other threads called oig_complete_one() to remove
1284          * an entry elsewhere in the list while we dropped lock.  We need to
1285          * drop the lock because osc_ap_completion() calls oig_complete_one()
1286          * which re-gets this lock ;-) as well as a lock ordering issue. */
1287 restart:
1288         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1289                 if (occ->interrupted)
1290                         continue;
1291                 occ->interrupted = 1;
1292                 spin_unlock(&oig->oig_lock);
1293                 occ->occ_interrupted(occ);
1294                 spin_lock(&oig->oig_lock);
1295                 goto restart;
1296         }
1297         spin_unlock(&oig->oig_lock);
1298 }
1299
1300 int oig_wait(struct obd_io_group *oig)
1301 {
1302         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1303         int rc;
1304
1305         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1306
1307         do {
1308                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1309                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1310                 /* we can't continue until the oig has emptied and stopped
1311                  * referencing state that the caller will free upon return */
1312                 if (rc == -EINTR)
1313                         lwi = (struct l_wait_info){ 0, };
1314         } while (rc == -EINTR);
1315
1316         LASSERTF(oig->oig_pending == 0,
1317                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1318                  oig->oig_pending);
1319
1320         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1321         return oig->oig_rc;
1322 }
1323 EXPORT_SYMBOL(oig_wait);
1324
1325 void class_fail_export(struct obd_export *exp)
1326 {
1327         int rc, already_failed;
1328
1329         spin_lock(&exp->exp_lock);
1330         already_failed = exp->exp_failed;
1331         exp->exp_failed = 1;
1332         spin_unlock(&exp->exp_lock);
1333
1334         if (already_failed) {
1335                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1336                        exp, exp->exp_client_uuid.uuid);
1337                 return;
1338         }
1339
1340         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1341                exp, exp->exp_client_uuid.uuid);
1342
1343         if (obd_dump_on_timeout)
1344                 libcfs_debug_dumplog();
1345
1346         /* Most callers into obd_disconnect are removing their own reference
1347          * (request, for example) in addition to the one from the hash table.
1348          * We don't have such a reference here, so make one. */
1349         class_export_get(exp);
1350         rc = obd_disconnect(exp);
1351         if (rc)
1352                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1353         else
1354                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1355                        exp, exp->exp_client_uuid.uuid);
1356 }
1357 EXPORT_SYMBOL(class_fail_export);
1358
1359 char *obd_export_nid2str(struct obd_export *exp)
1360 {
1361         if (exp->exp_connection != NULL)
1362                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1363
1364         return "(no nid)";
1365 }
1366 EXPORT_SYMBOL(obd_export_nid2str);
1367
1368 int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
1369 {
1370         struct obd_export *doomed_exp = NULL;
1371         int exports_evicted = 0;
1372
1373         lnet_nid_t nid_key = libcfs_str2nid(nid);
1374
1375         do {
1376                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1377
1378                 if (doomed_exp == NULL)
1379                         break;
1380
1381                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1382                          "nid %s found, wanted nid %s, requested nid %s\n",
1383                          obd_export_nid2str(doomed_exp),
1384                          libcfs_nid2str(nid_key), nid);
1385
1386                 exports_evicted++;
1387                 CDEBUG(D_HA, "%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1388                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1389                        exports_evicted);
1390                 class_fail_export(doomed_exp);
1391                 class_export_put(doomed_exp);
1392         } while (1);
1393
1394         if (!exports_evicted)
1395                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1396                        obd->obd_name, nid);
1397         return exports_evicted;
1398 }
1399 EXPORT_SYMBOL(obd_export_evict_by_nid);
1400
1401 int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
1402 {
1403         struct obd_export *doomed_exp = NULL;
1404         struct obd_uuid doomed_uuid;
1405         int exports_evicted = 0;
1406
1407         obd_str2uuid(&doomed_uuid, uuid);
1408         if(obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1409                 CERROR("%s: can't evict myself\n", obd->obd_name);
1410                 return exports_evicted;
1411         }
1412
1413         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1414
1415         if (doomed_exp == NULL) {
1416                 CERROR("%s: can't disconnect %s: no exports found\n",
1417                        obd->obd_name, uuid);
1418         } else {
1419                 CWARN("%s: evicting %s at adminstrative request\n",
1420                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1421                 class_fail_export(doomed_exp);
1422                 class_export_put(doomed_exp);
1423                 exports_evicted++;
1424         }
1425
1426         return exports_evicted;
1427 }
1428 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1429
1430 void obd_zombie_impexp_cull(void)
1431 {
1432         struct obd_import *import;
1433         struct obd_export *export;
1434
1435         do {
1436                 spin_lock (&obd_zombie_impexp_lock);
1437
1438                 import = NULL;
1439                 if (!list_empty(&obd_zombie_imports)) {
1440                         import = list_entry(obd_zombie_imports.next,
1441                                             struct obd_import,
1442                                             imp_zombie_chain);
1443                         list_del(&import->imp_zombie_chain);
1444                 }
1445
1446                 export = NULL;
1447                 if (!list_empty(&obd_zombie_exports)) {
1448                         export = list_entry(obd_zombie_exports.next,
1449                                             struct obd_export,
1450                                             exp_obd_chain);
1451                         list_del_init(&export->exp_obd_chain);
1452                 }
1453
1454                 spin_unlock(&obd_zombie_impexp_lock);
1455
1456                 if (import != NULL)
1457                         class_import_destroy(import);
1458
1459                 if (export != NULL)
1460                         class_export_destroy(export);
1461
1462         } while (import != NULL || export != NULL);
1463 }
1464
1465 static struct completion        obd_zombie_start;
1466 static struct completion        obd_zombie_stop;
1467 static unsigned long            obd_zombie_flags;
1468 static cfs_waitq_t              obd_zombie_waitq;
1469
1470 enum {
1471         OBD_ZOMBIE_STOP = 1
1472 };
1473
1474 int obd_zombi_impexp_check(void *arg)
1475 {
1476         int rc;
1477
1478         spin_lock(&obd_zombie_impexp_lock);
1479         rc = list_empty(&obd_zombie_imports) &&
1480              list_empty(&obd_zombie_exports) &&
1481              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1482
1483         spin_unlock(&obd_zombie_impexp_lock);
1484
1485         RETURN(rc);
1486 }
1487
1488 static void obd_zombie_impexp_notify(void)
1489 {
1490         cfs_waitq_signal(&obd_zombie_waitq);
1491 }
1492
1493 #ifdef __KERNEL__
1494
1495 static int obd_zombie_impexp_thread(void *unused)
1496 {
1497         int rc;
1498
1499         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1500                 complete(&obd_zombie_start);
1501                 RETURN(rc);
1502         }
1503
1504         complete(&obd_zombie_start);
1505
1506         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1507                 struct l_wait_info lwi = { 0 };
1508
1509                 l_wait_event(obd_zombie_waitq, !obd_zombi_impexp_check(NULL), &lwi);
1510
1511                 obd_zombie_impexp_cull();
1512         }
1513
1514         complete(&obd_zombie_stop);
1515
1516         RETURN(0);
1517 }
1518
1519 #else /* ! KERNEL */
1520
1521 static atomic_t zombi_recur = ATOMIC_INIT(0);
1522 static void *obd_zombi_impexp_work_cb;
1523 static void *obd_zombi_impexp_idle_cb;
1524
1525 int obd_zombi_impexp_kill(void *arg)
1526 {
1527         int rc = 0;
1528
1529         if (atomic_inc_return(&zombi_recur) == 1) {
1530                 obd_zombie_impexp_cull();
1531                 rc = 1;
1532         }
1533         atomic_dec(&zombi_recur);
1534         return rc;
1535 }
1536
1537 #endif
1538
1539 int obd_zombie_impexp_init(void)
1540 {
1541         int rc;
1542
1543         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1544         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1545         spin_lock_init(&obd_zombie_impexp_lock);
1546         init_completion(&obd_zombie_start);
1547         init_completion(&obd_zombie_stop);
1548         cfs_waitq_init(&obd_zombie_waitq);
1549
1550 #ifdef __KERNEL__
1551         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1552         if (rc < 0)
1553                 RETURN(rc);
1554
1555         wait_for_completion(&obd_zombie_start);
1556 #else
1557
1558         obd_zombi_impexp_work_cb =
1559                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1560                                                  &obd_zombi_impexp_kill, NULL);
1561
1562         obd_zombi_impexp_idle_cb =
1563                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1564                                                  &obd_zombi_impexp_check, NULL);
1565         rc = 0;
1566
1567 #endif
1568         RETURN(rc);
1569 }
1570
1571 void obd_zombie_impexp_stop(void)
1572 {
1573         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1574         obd_zombie_impexp_notify();
1575 #ifdef __KERNEL__
1576         wait_for_completion(&obd_zombie_stop);
1577 #else
1578         liblustre_deregister_wait_callback(obd_zombi_impexp_work_cb);
1579         liblustre_deregister_idle_callback(obd_zombi_impexp_idle_cb);
1580 #endif
1581 }