Whamcloud - gitweb
0bf5fd58becf05449c2fb998a19a9673ffa0fc74
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50 #include <lustre_export.h>
51
52 extern struct list_head obd_types;
53 spinlock_t obd_types_lock;
54
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
59
60 struct list_head  obd_zombie_imports;
61 struct list_head  obd_zombie_exports;
62 spinlock_t        obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66
67 /*
68  * support functions: we could use inter-module communication, but this
69  * is more portable to other OS's
70  */
71 static struct obd_device *obd_device_alloc(void)
72 {
73         struct obd_device *obd;
74
75         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76         if (obd != NULL) {
77                 obd->obd_magic = OBD_DEVICE_MAGIC;
78         }
79         return obd;
80 }
81 EXPORT_SYMBOL(obd_device_alloc);
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic "
87                  "%08x != %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up "
90                        "(obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96 EXPORT_SYMBOL(obd_device_free);
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef CONFIG_KMOD
120         if (!type) {
121                 const char *modname = name;
122                 if (strcmp(modname, LUSTRE_MDT_NAME) == 0)
123                         modname = LUSTRE_MDS_NAME;
124                 if (!request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 try_module_get(type->typ_ops->o_owner);
137                 spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141
142 void class_put_type(struct obd_type *type)
143 {
144         LASSERT(type);
145         spin_lock(&type->obd_type_lock);
146         type->typ_refcnt--;
147         module_put(type->typ_ops->o_owner);
148         spin_unlock(&type->obd_type_lock);
149 }
150
151 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
152                         const char *name)
153 {
154         struct obd_type *type;
155         int rc = 0;
156         ENTRY;
157
158         LASSERT(strnlen(name, 1024) < 1024);    /* sanity check */
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
171         OBD_ALLOC(type->typ_name, strlen(name) + 1);
172         if (type->typ_ops == NULL || type->typ_name == NULL)
173                 GOTO (failed, rc);
174
175         *(type->typ_ops) = *ops;
176         strcpy(type->typ_name, name);
177         spin_lock_init(&type->obd_type_lock);
178
179 #ifdef LPROCFS
180         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
181                                               vars, type);
182         if (IS_ERR(type->typ_procroot)) {
183                 rc = PTR_ERR(type->typ_procroot);
184                 type->typ_procroot = NULL;
185                 GOTO (failed, rc);
186         }
187 #endif
188
189         spin_lock(&obd_types_lock);
190         list_add(&type->typ_chain, &obd_types);
191         spin_unlock(&obd_types_lock);
192
193         RETURN (0);
194
195  failed:
196         if (type->typ_name != NULL)
197                 OBD_FREE(type->typ_name, strlen(name) + 1);
198         if (type->typ_ops != NULL)
199                 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
200         OBD_FREE(type, sizeof(*type));
201         RETURN(rc);
202 }
203
204 int class_unregister_type(const char *name)
205 {
206         struct obd_type *type = class_search_type(name);
207         ENTRY;
208
209         if (!type) {
210                 CERROR("unknown obd type\n");
211                 RETURN(-EINVAL);
212         }
213
214         if (type->typ_refcnt) {
215                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
216                 /* This is a bad situation, let's make the best of it */
217                 /* Remove ops, but leave the name for debugging */
218                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
219                 RETURN(-EBUSY);
220         }
221
222         if (type->typ_procroot)
223                 lprocfs_remove(&type->typ_procroot);
224
225         spin_lock(&obd_types_lock);
226         list_del(&type->typ_chain);
227         spin_unlock(&obd_types_lock);
228         OBD_FREE(type->typ_name, strlen(name) + 1);
229         if (type->typ_ops != NULL)
230                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
231         OBD_FREE(type, sizeof(*type));
232         RETURN(0);
233 } /* class_unregister_type */
234
235 /**
236  * Create a new obd device.
237  *
238  * Find an empty slot in ::obd_devs[], create a new obd device in it.
239  *
240  * \param typename [in] obd device type string.
241  * \param name     [in] obd device name.
242  *
243  * \retval NULL if create fails, otherwise return the obd device
244  *         pointer created.
245  */
246 struct obd_device *class_newdev(const char *type_name, const char *name)
247 {
248         struct obd_device *result = NULL;
249         struct obd_device *newdev;
250         struct obd_type *type = NULL;
251         int i;
252         int new_obd_minor = 0;
253
254         if (strlen(name) >= MAX_OBD_NAME) {
255                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
256                 RETURN(ERR_PTR(-EINVAL));
257         }
258
259         type = class_get_type(type_name);
260         if (type == NULL){
261                 CERROR("OBD: unknown type: %s\n", type_name);
262                 RETURN(ERR_PTR(-ENODEV));
263         }
264
265         newdev = obd_device_alloc();
266         if (newdev == NULL) {
267                 class_put_type(type);
268                 RETURN(ERR_PTR(-ENOMEM));
269         }
270         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
271
272         spin_lock(&obd_dev_lock);
273         for (i = 0; i < class_devno_max(); i++) {
274                 struct obd_device *obd = class_num2obd(i);
275                 if (obd && obd->obd_name && (strcmp(name, obd->obd_name) == 0)){
276                         CERROR("Device %s already exists, won't add\n", name);
277                         if (result) {
278                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
279                                          "%p obd_magic %08x != %08x\n", result,
280                                          result->obd_magic, OBD_DEVICE_MAGIC);
281                                 LASSERTF(result->obd_minor == new_obd_minor,
282                                          "%p obd_minor %d != %d\n", result,
283                                          result->obd_minor, new_obd_minor);
284
285                                 obd_devs[result->obd_minor] = NULL;
286                                 result->obd_name[0]='\0';
287                         }
288                         result = ERR_PTR(-EEXIST);
289                         break;
290                 }
291                 if (!result && !obd) {
292                         result = newdev;
293                         result->obd_minor = i;
294                         new_obd_minor = i;
295                         result->obd_type = type;
296                         strncpy(result->obd_name, name,
297                                 sizeof(result->obd_name) - 1);
298                         obd_devs[i] = result;
299                 }
300         }
301         spin_unlock(&obd_dev_lock);
302
303         if (result == NULL && i >= class_devno_max()) {
304                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
305                        class_devno_max());
306                 result = ERR_PTR(-EOVERFLOW);
307         }
308
309         if (IS_ERR(result)) {
310                 obd_device_free(newdev);
311                 class_put_type(type);
312         } else {
313                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
314                        result->obd_name, result);
315         }
316         return result;
317 }
318
319 void class_release_dev(struct obd_device *obd)
320 {
321         struct obd_type *obd_type = obd->obd_type;
322
323         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
324                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
325         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
326                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
327         LASSERT(obd_type != NULL);
328
329         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
330                obd->obd_name,obd->obd_type->typ_name);
331
332         spin_lock(&obd_dev_lock);
333         obd_devs[obd->obd_minor] = NULL;
334         spin_unlock(&obd_dev_lock);
335         obd_device_free(obd);
336
337         class_put_type(obd_type);
338 }
339
340 int class_name2dev(const char *name)
341 {
342         int i;
343
344         if (!name)
345                 return -1;
346
347         spin_lock(&obd_dev_lock);
348         for (i = 0; i < class_devno_max(); i++) {
349                 struct obd_device *obd = class_num2obd(i);
350                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
351                         /* Make sure we finished attaching before we give
352                            out any references */
353                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
354                         if (obd->obd_attached) {
355                                 spin_unlock(&obd_dev_lock);
356                                 return i;
357                         }
358                         break;
359                 }
360         }
361         spin_unlock(&obd_dev_lock);
362
363         return -1;
364 }
365
366 struct obd_device *class_name2obd(const char *name)
367 {
368         int dev = class_name2dev(name);
369
370         if (dev < 0 || dev > class_devno_max())
371                 return NULL;
372         return class_num2obd(dev);
373 }
374
375 int class_uuid2dev(struct obd_uuid *uuid)
376 {
377         int i;
378
379         spin_lock(&obd_dev_lock);
380         for (i = 0; i < class_devno_max(); i++) {
381                 struct obd_device *obd = class_num2obd(i);
382                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
383                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
384                         spin_unlock(&obd_dev_lock);
385                         return i;
386                 }
387         }
388         spin_unlock(&obd_dev_lock);
389
390         return -1;
391 }
392
393 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
394 {
395         int dev = class_uuid2dev(uuid);
396         if (dev < 0)
397                 return NULL;
398         return class_num2obd(dev);
399 }
400
401 /**
402  * Get obd device from ::obd_devs[]
403  *
404  * \param num [in] array index
405  *
406  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
407  *         otherwise return the obd device there.
408  */
409 struct obd_device *class_num2obd(int num)
410 {
411         struct obd_device *obd = NULL;
412
413         if (num < class_devno_max()) {
414                 obd = obd_devs[num];
415                 if (obd == NULL)
416                         return NULL;
417
418                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
419                          "%p obd_magic %08x != %08x\n",
420                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
421                 LASSERTF(obd->obd_minor == num,
422                          "%p obd_minor %0d != %0d\n",
423                          obd, obd->obd_minor, num);
424         }
425
426         return obd;
427 }
428
429 void class_obd_list(void)
430 {
431         char *status;
432         int i;
433
434         spin_lock(&obd_dev_lock);
435         for (i = 0; i < class_devno_max(); i++) {
436                 struct obd_device *obd = class_num2obd(i);
437                 if (obd == NULL)
438                         continue;
439                 if (obd->obd_stopping)
440                         status = "ST";
441                 else if (obd->obd_set_up)
442                         status = "UP";
443                 else if (obd->obd_attached)
444                         status = "AT";
445                 else
446                         status = "--";
447                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
448                          i, status, obd->obd_type->typ_name,
449                          obd->obd_name, obd->obd_uuid.uuid,
450                          atomic_read(&obd->obd_refcount));
451         }
452         spin_unlock(&obd_dev_lock);
453         return;
454 }
455
456 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
457    specified, then only the client with that uuid is returned,
458    otherwise any client connected to the tgt is returned. */
459 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
460                                           const char * typ_name,
461                                           struct obd_uuid *grp_uuid)
462 {
463         int i;
464
465         spin_lock(&obd_dev_lock);
466         for (i = 0; i < class_devno_max(); i++) {
467                 struct obd_device *obd = class_num2obd(i);
468                 if (obd == NULL)
469                         continue;
470                 if ((strncmp(obd->obd_type->typ_name, typ_name,
471                              strlen(typ_name)) == 0)) {
472                         if (obd_uuid_equals(tgt_uuid,
473                                             &obd->u.cli.cl_target_uuid) &&
474                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
475                                                          &obd->obd_uuid) : 1)) {
476                                 spin_unlock(&obd_dev_lock);
477                                 return obd;
478                         }
479                 }
480         }
481         spin_unlock(&obd_dev_lock);
482
483         return NULL;
484 }
485
486 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
487                                             struct obd_uuid *grp_uuid)
488 {
489         struct obd_device *obd;
490
491         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
492         if (!obd)
493                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
494                                             grp_uuid);
495         return obd;
496 }
497
498 /* Iterate the obd_device list looking devices have grp_uuid. Start
499    searching at *next, and if a device is found, the next index to look
500    at is saved in *next. If next is NULL, then the first matching device
501    will always be returned. */
502 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
503 {
504         int i;
505
506         if (next == NULL)
507                 i = 0;
508         else if (*next >= 0 && *next < class_devno_max())
509                 i = *next;
510         else
511                 return NULL;
512
513         spin_lock(&obd_dev_lock);
514         for (; i < class_devno_max(); i++) {
515                 struct obd_device *obd = class_num2obd(i);
516                 if (obd == NULL)
517                         continue;
518                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
519                         if (next != NULL)
520                                 *next = i+1;
521                         spin_unlock(&obd_dev_lock);
522                         return obd;
523                 }
524         }
525         spin_unlock(&obd_dev_lock);
526
527         return NULL;
528 }
529
530
531 void obd_cleanup_caches(void)
532 {
533         int rc;
534
535         ENTRY;
536         if (obd_device_cachep) {
537                 rc = cfs_mem_cache_destroy(obd_device_cachep);
538                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
539                 obd_device_cachep = NULL;
540         }
541         if (obdo_cachep) {
542                 rc = cfs_mem_cache_destroy(obdo_cachep);
543                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
544                 obdo_cachep = NULL;
545         }
546         if (import_cachep) {
547                 rc = cfs_mem_cache_destroy(import_cachep);
548                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
549                 import_cachep = NULL;
550         }
551         EXIT;
552 }
553
554 int obd_init_caches(void)
555 {
556         ENTRY;
557
558         LASSERT(obd_device_cachep == NULL);
559         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
560                                                  sizeof(struct obd_device),
561                                                  0, 0);
562         if (!obd_device_cachep)
563                 GOTO(out, -ENOMEM);
564
565         LASSERT(obdo_cachep == NULL);
566         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
567                                            0, 0);
568         if (!obdo_cachep)
569                 GOTO(out, -ENOMEM);
570
571         LASSERT(import_cachep == NULL);
572         import_cachep = cfs_mem_cache_create("ll_import_cache",
573                                              sizeof(struct obd_import),
574                                              0, 0);
575         if (!import_cachep)
576                 GOTO(out, -ENOMEM);
577
578         RETURN(0);
579  out:
580         obd_cleanup_caches();
581         RETURN(-ENOMEM);
582
583 }
584
585 /* map connection to client */
586 struct obd_export *class_conn2export(struct lustre_handle *conn)
587 {
588         struct obd_export *export;
589         ENTRY;
590
591         if (!conn) {
592                 CDEBUG(D_CACHE, "looking for null handle\n");
593                 RETURN(NULL);
594         }
595
596         if (conn->cookie == -1) {  /* this means assign a new connection */
597                 CDEBUG(D_CACHE, "want a new connection\n");
598                 RETURN(NULL);
599         }
600
601         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
602         export = class_handle2object(conn->cookie);
603         RETURN(export);
604 }
605
606 struct obd_device *class_exp2obd(struct obd_export *exp)
607 {
608         if (exp)
609                 return exp->exp_obd;
610         return NULL;
611 }
612
613 struct obd_device *class_conn2obd(struct lustre_handle *conn)
614 {
615         struct obd_export *export;
616         export = class_conn2export(conn);
617         if (export) {
618                 struct obd_device *obd = export->exp_obd;
619                 class_export_put(export);
620                 return obd;
621         }
622         return NULL;
623 }
624
625 struct obd_import *class_exp2cliimp(struct obd_export *exp)
626 {
627         struct obd_device *obd = exp->exp_obd;
628         if (obd == NULL)
629                 return NULL;
630         return obd->u.cli.cl_import;
631 }
632
633 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
634 {
635         struct obd_device *obd = class_conn2obd(conn);
636         if (obd == NULL)
637                 return NULL;
638         return obd->u.cli.cl_import;
639 }
640
641 /* Export management functions */
642 static void export_handle_addref(void *export)
643 {
644         class_export_get(export);
645 }
646
647 /* called from mds_commit_cb() in context of journal commit callback
648  * and cannot call any blocking functions. */
649 void __class_export_put(struct obd_export *exp)
650 {
651         if (atomic_dec_and_test(&exp->exp_refcount)) {
652                 LASSERT (list_empty(&exp->exp_obd_chain));
653
654                 CDEBUG(D_IOCTL, "final put %p/%s\n",
655                        exp, exp->exp_client_uuid.uuid);
656
657                 spin_lock(&obd_zombie_impexp_lock);
658                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
659                 spin_unlock(&obd_zombie_impexp_lock);
660
661                 obd_zombie_impexp_notify();
662         }
663 }
664 EXPORT_SYMBOL(__class_export_put);
665
666 void class_export_destroy(struct obd_export *exp)
667 {
668         struct obd_device *obd = exp->exp_obd;
669
670         LASSERT (atomic_read(&exp->exp_refcount) == 0);
671
672         CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
673                exp->exp_client_uuid.uuid);
674
675         LASSERT(obd != NULL);
676
677         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
678         if (exp->exp_connection)
679                 ptlrpc_put_connection_superhack(exp->exp_connection);
680
681         LASSERT(list_empty(&exp->exp_outstanding_replies));
682         LASSERT(list_empty(&exp->exp_uncommitted_replies));
683         LASSERT(list_empty(&exp->exp_req_replay_queue));
684         LASSERT(list_empty(&exp->exp_queued_rpc));
685         obd_destroy_export(exp);
686
687         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
688         class_decref(obd);
689 }
690
691 /* Creates a new export, adds it to the hash table, and returns a
692  * pointer to it. The refcount is 2: one for the hash reference, and
693  * one for the pointer returned by this function. */
694 struct obd_export *class_new_export(struct obd_device *obd,
695                                     struct obd_uuid *cluuid)
696 {
697         struct obd_export *export;
698         int rc = 0;
699
700         OBD_ALLOC(export, sizeof(*export));
701         if (!export)
702                 return ERR_PTR(-ENOMEM);
703
704         export->exp_conn_cnt = 0;
705         export->exp_lock_hash = NULL;
706         atomic_set(&export->exp_refcount, 2);
707         atomic_set(&export->exp_rpc_count, 0);
708         export->exp_obd = obd;
709         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
710         spin_lock_init(&export->exp_uncommitted_replies_lock);
711         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
712         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
713         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
714
715         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
716         class_handle_hash(&export->exp_handle, export_handle_addref);
717         export->exp_last_request_time = cfs_time_current_sec();
718         spin_lock_init(&export->exp_lock);
719         INIT_HLIST_NODE(&export->exp_uuid_hash);
720         INIT_HLIST_NODE(&export->exp_nid_hash);
721
722         export->exp_client_uuid = *cluuid;
723         obd_init_export(export);
724
725         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
726                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
727                                             &export->exp_uuid_hash);
728                 if (rc != 0) {
729                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
730                                       obd->obd_name, cluuid->uuid, rc);
731                         class_handle_unhash(&export->exp_handle);
732                         OBD_FREE_PTR(export);
733                         return ERR_PTR(-EALREADY);
734                 }
735         }
736
737         spin_lock(&obd->obd_dev_lock);
738         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
739         class_incref(obd);
740         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
741         list_add_tail(&export->exp_obd_chain_timed,
742                       &export->exp_obd->obd_exports_timed);
743         export->exp_obd->obd_num_exports++;
744         spin_unlock(&obd->obd_dev_lock);
745
746         return export;
747 }
748 EXPORT_SYMBOL(class_new_export);
749
750 void class_unlink_export(struct obd_export *exp)
751 {
752         class_handle_unhash(&exp->exp_handle);
753
754         spin_lock(&exp->exp_obd->obd_dev_lock);
755         /* delete an uuid-export hashitem from hashtables */
756         if (!hlist_unhashed(&exp->exp_uuid_hash))
757                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
758                                 &exp->exp_client_uuid,
759                                 &exp->exp_uuid_hash);
760
761         list_del_init(&exp->exp_obd_chain);
762         list_del_init(&exp->exp_obd_chain_timed);
763         exp->exp_obd->obd_num_exports--;
764         spin_unlock(&exp->exp_obd->obd_dev_lock);
765         /* Keep these counter valid always */
766         spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
767         if (exp->exp_delayed) {
768                 spin_lock(&exp->exp_lock);
769                 exp->exp_delayed = 0;
770                 spin_unlock(&exp->exp_lock);
771                 LASSERT(exp->exp_obd->obd_delayed_clients);
772                 exp->exp_obd->obd_delayed_clients--;
773         } else if (exp->exp_replay_needed) {
774                         spin_lock(&exp->exp_lock);
775                         exp->exp_replay_needed = 0;
776                         spin_unlock(&exp->exp_lock);
777                         LASSERT(exp->exp_obd->obd_recoverable_clients);
778                         exp->exp_obd->obd_recoverable_clients--;
779         }
780
781         if (exp->exp_obd->obd_recovering && exp->exp_in_recovery) {
782                 spin_lock(&exp->exp_lock);
783                 exp->exp_in_recovery = 0;
784                 spin_unlock(&exp->exp_lock);
785                 LASSERT(exp->exp_obd->obd_connected_clients);
786                 exp->exp_obd->obd_connected_clients--;
787         }
788         spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
789         class_export_put(exp);
790 }
791 EXPORT_SYMBOL(class_unlink_export);
792
793 /* Import management functions */
794 static void import_handle_addref(void *import)
795 {
796         class_import_get(import);
797 }
798
799 struct obd_import *class_import_get(struct obd_import *import)
800 {
801         LASSERT(atomic_read(&import->imp_refcount) >= 0);
802         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
803         atomic_inc(&import->imp_refcount);
804         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
805                atomic_read(&import->imp_refcount), 
806                import->imp_obd->obd_name);
807         return import;
808 }
809 EXPORT_SYMBOL(class_import_get);
810
811 void class_import_put(struct obd_import *import)
812 {
813         ENTRY;
814
815         LASSERT(atomic_read(&import->imp_refcount) > 0);
816         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
817         LASSERT(list_empty(&import->imp_zombie_chain));
818
819         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
820                atomic_read(&import->imp_refcount) - 1, 
821                import->imp_obd->obd_name);
822
823         if (atomic_dec_and_test(&import->imp_refcount)) {
824                 CDEBUG(D_INFO, "final put import %p\n", import);
825                 spin_lock(&obd_zombie_impexp_lock);
826                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
827                 spin_unlock(&obd_zombie_impexp_lock);
828
829                 obd_zombie_impexp_notify();
830         }
831
832         EXIT;
833 }
834 EXPORT_SYMBOL(class_import_put);
835
836 void class_import_destroy(struct obd_import *import)
837 {
838         ENTRY;
839
840         CDEBUG(D_IOCTL, "destroying import %p\n", import);
841
842         LASSERT(atomic_read(&import->imp_refcount) == 0);
843
844         ptlrpc_put_connection_superhack(import->imp_connection);
845
846         while (!list_empty(&import->imp_conn_list)) {
847                 struct obd_import_conn *imp_conn;
848
849                 imp_conn = list_entry(import->imp_conn_list.next,
850                                       struct obd_import_conn, oic_item);
851                 list_del(&imp_conn->oic_item);
852                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
853                 OBD_FREE(imp_conn, sizeof(*imp_conn));
854         }
855
856         class_decref(import->imp_obd);
857         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
858         EXIT;
859 }
860
861 static void init_imp_at(struct imp_at *at) {
862         int i;
863         at_init(&at->iat_net_latency, 0, 0);
864         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
865                 /* max service estimates are tracked on the server side, so
866                    don't use the AT history here, just use the last reported
867                    val. (But keep hist for proc histogram, worst_ever) */
868                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
869                         AT_FLG_NOHIST);
870         }
871 }
872
873 struct obd_import *class_new_import(struct obd_device *obd)
874 {
875         struct obd_import *imp;
876
877         OBD_ALLOC(imp, sizeof(*imp));
878         if (imp == NULL)
879                 return NULL;
880
881         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
882         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
883         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
884         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
885         spin_lock_init(&imp->imp_lock);
886         imp->imp_last_success_conn = 0;
887         imp->imp_state = LUSTRE_IMP_NEW;
888         imp->imp_obd = class_incref(obd);
889         cfs_waitq_init(&imp->imp_recovery_waitq);
890
891         atomic_set(&imp->imp_refcount, 2);
892         atomic_set(&imp->imp_unregistering, 0);
893         atomic_set(&imp->imp_inflight, 0);
894         atomic_set(&imp->imp_replay_inflight, 0);
895         atomic_set(&imp->imp_inval_count, 0);
896         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
897         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
898         class_handle_hash(&imp->imp_handle, import_handle_addref);
899         init_imp_at(&imp->imp_at);
900
901 /* b1_8 supports both v1 & v2. but HEAD only supports v2.
902  * So let's use v2.
903  */
904 #define HAVE_DEFAULT_V2_CONNECT 1
905 #ifdef HAVE_DEFAULT_V2_CONNECT
906         /* the default magic is V2, will be used in connect RPC, and
907          * then adjusted according to the flags in request/reply. */
908         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
909 #else
910         /* the default magic is V1, will be used in connect RPC, and
911          * then adjusted according to the flags in request/reply. */
912         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V1;
913 #endif
914
915         return imp;
916 }
917 EXPORT_SYMBOL(class_new_import);
918
919 void class_destroy_import(struct obd_import *import)
920 {
921         LASSERT(import != NULL);
922         LASSERT(import != LP_POISON);
923
924         class_handle_unhash(&import->imp_handle);
925
926         spin_lock(&import->imp_lock);
927         import->imp_generation++;
928         spin_unlock(&import->imp_lock);
929
930         class_import_put(import);
931 }
932 EXPORT_SYMBOL(class_destroy_import);
933
934 /* A connection defines an export context in which preallocation can
935    be managed. This releases the export pointer reference, and returns
936    the export handle, so the export refcount is 1 when this function
937    returns. */
938 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
939                   struct obd_uuid *cluuid)
940 {
941         struct obd_export *export;
942         LASSERT(conn != NULL);
943         LASSERT(obd != NULL);
944         LASSERT(cluuid != NULL);
945         ENTRY;
946
947         export = class_new_export(obd, cluuid);
948         if (IS_ERR(export))
949                 RETURN(PTR_ERR(export));
950
951         conn->cookie = export->exp_handle.h_cookie;
952         class_export_put(export);
953
954         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
955                cluuid->uuid, conn->cookie);
956         RETURN(0);
957 }
958 EXPORT_SYMBOL(class_connect);
959
960 /* This function removes 1-3 references from the export:
961  * 1 - for export pointer passed
962  * and if disconnect really need
963  * 2 - removing from hash
964  * 3 - in client_unlink_export
965  * The export pointer passed to this function can destroyed */
966 int class_disconnect(struct obd_export *export)
967 {
968         int already_disconnected;
969         ENTRY;
970
971         if (export == NULL) {
972                 fixme();
973                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
974                 RETURN(-EINVAL);
975         }
976
977         spin_lock(&export->exp_lock);
978         already_disconnected = export->exp_disconnected;
979         export->exp_disconnected = 1;
980         spin_unlock(&export->exp_lock);
981
982
983         /* class_cleanup(), abort_recovery(), and class_fail_export()
984          * all end up in here, and if any of them race we shouldn't
985          * call extra class_export_puts(). */
986         if (already_disconnected)
987                 GOTO(no_disconn, already_disconnected);
988
989         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
990                export->exp_handle.h_cookie);
991
992
993         if (!hlist_unhashed(&export->exp_nid_hash))
994                 lustre_hash_del(export->exp_obd->obd_nid_hash,
995                                 &export->exp_connection->c_peer.nid,
996                                 &export->exp_nid_hash);
997
998         class_unlink_export(export);
999
1000 no_disconn:
1001         class_export_put(export);
1002         RETURN(0);
1003 }
1004
1005 /* Return non-zero for a fully connected export */
1006 int class_connected_export(struct obd_export *exp)
1007 {
1008         if (exp) {
1009                 int connected;
1010                 spin_lock(&exp->exp_lock);
1011                 connected = (exp->exp_conn_cnt > 0);
1012                 spin_unlock(&exp->exp_lock);
1013                 return connected;
1014         }
1015         return 0;
1016 }
1017 EXPORT_SYMBOL(class_connected_export);
1018
1019 static void class_disconnect_export_list(struct list_head *list,
1020                                          enum obd_option flags)
1021 {
1022         int rc;
1023         struct obd_export *exp;
1024         ENTRY;
1025
1026         /* It's possible that an export may disconnect itself, but
1027          * nothing else will be added to this list. */
1028         while (!list_empty(list)) {
1029                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1030                 /* need for safe call CDEBUG after obd_disconnect */
1031                 class_export_get(exp);
1032
1033                 spin_lock(&exp->exp_lock);
1034                 exp->exp_flags = flags;
1035                 spin_unlock(&exp->exp_lock);
1036
1037                 if (obd_uuid_equals(&exp->exp_client_uuid,
1038                                     &exp->exp_obd->obd_uuid)) {
1039                         CDEBUG(D_HA,
1040                                "exp %p export uuid == obd uuid, don't discon\n",
1041                                exp);
1042                         /* Need to delete this now so we don't end up pointing
1043                          * to work_list later when this export is cleaned up. */
1044                         list_del_init(&exp->exp_obd_chain);
1045                         class_export_put(exp);
1046                         continue;
1047                 }
1048
1049                 class_export_get(exp);
1050                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1051                        "last request at %ld\n",
1052                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1053                        exp, exp->exp_last_request_time);
1054
1055                 /* release one export reference anyway */
1056                 rc = obd_disconnect(exp);
1057                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1058                        obd_export_nid2str(exp), exp, rc);
1059                 class_export_put(exp);
1060         }
1061         EXIT;
1062 }
1063
1064 void class_disconnect_exports(struct obd_device *obd)
1065 {
1066         struct list_head work_list;
1067         ENTRY;
1068
1069         /* Move all of the exports from obd_exports to a work list, en masse. */
1070         CFS_INIT_LIST_HEAD(&work_list);
1071         spin_lock(&obd->obd_dev_lock);
1072         list_splice_init(&obd->obd_delayed_exports, &work_list);
1073         list_splice_init(&obd->obd_exports, &work_list);
1074         spin_unlock(&obd->obd_dev_lock);
1075
1076         CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1077                "disconnecting them\n", obd->obd_minor, obd);
1078         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd));
1079         EXIT;
1080 }
1081 EXPORT_SYMBOL(class_disconnect_exports);
1082
1083 /* Remove exports that have not completed recovery. */
1084 void class_disconnect_stale_exports(struct obd_device *obd,
1085                                     enum obd_option flags)
1086 {
1087         struct list_head work_list;
1088         struct list_head *pos, *n;
1089         struct obd_export *exp;
1090         ENTRY;
1091
1092         CFS_INIT_LIST_HEAD(&work_list);
1093         spin_lock(&obd->obd_dev_lock);
1094         list_for_each_safe(pos, n, &obd->obd_exports) {
1095                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1096                 if (exp->exp_replay_needed) {
1097                         list_move(&exp->exp_obd_chain, &work_list);
1098                         obd->obd_stale_clients++;
1099                 }
1100         }
1101         spin_unlock(&obd->obd_dev_lock);
1102
1103         CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1104                obd->obd_name, obd->obd_stale_clients);
1105         class_disconnect_export_list(&work_list, flags);
1106         EXIT;
1107 }
1108 EXPORT_SYMBOL(class_disconnect_stale_exports);
1109
1110 void class_disconnect_expired_exports(struct obd_device *obd)
1111 {
1112         struct list_head expired_list;
1113         struct obd_export *exp, *n;
1114         int cnt = 0;
1115         ENTRY;
1116
1117         CFS_INIT_LIST_HEAD(&expired_list);
1118         spin_lock(&obd->obd_dev_lock);
1119         list_for_each_entry_safe(exp, n, &obd->obd_delayed_exports,
1120                                  exp_obd_chain) {
1121                 if (exp_expired(exp, obd->u.obt.obt_stale_export_age)) {
1122                         list_move(&exp->exp_obd_chain, &expired_list);
1123                         cnt++;
1124                 }
1125         }
1126         spin_unlock(&obd->obd_dev_lock);
1127
1128         if (cnt == 0)
1129                 return;
1130
1131         CDEBUG(D_INFO, "%s: disconnecting %d expired exports\n",
1132                obd->obd_name, cnt);
1133         class_disconnect_export_list(&expired_list, exp_flags_from_obd(obd));
1134
1135         EXIT;
1136 }
1137 EXPORT_SYMBOL(class_disconnect_expired_exports);
1138
1139 void class_set_export_delayed(struct obd_export *exp)
1140 {
1141         struct obd_device *obd = class_exp2obd(exp);
1142
1143         LASSERT(!exp->exp_delayed);
1144
1145         /* no need to ping delayed exports */
1146         spin_lock(&obd->obd_dev_lock);
1147         list_del_init(&exp->exp_obd_chain_timed);
1148         list_move_tail(&exp->exp_obd_chain, &obd->obd_delayed_exports);
1149         spin_unlock(&obd->obd_dev_lock);
1150
1151         LASSERT(obd->obd_recoverable_clients > 0);
1152
1153         spin_lock_bh(&obd->obd_processing_task_lock);
1154         /* race with target_queue_last_replay_reply? */
1155         if (exp->exp_replay_needed) {
1156                 spin_lock(&exp->exp_lock);
1157                 exp->exp_delayed = 1;
1158                 spin_unlock(&exp->exp_lock);
1159
1160                 obd->obd_delayed_clients++;
1161                 obd->obd_recoverable_clients--;
1162         }
1163         spin_unlock_bh(&obd->obd_processing_task_lock);
1164
1165         CDEBUG(D_HA, "%s: set client %s as delayed\n",
1166                obd->obd_name, exp->exp_client_uuid.uuid);
1167 }
1168 EXPORT_SYMBOL(class_set_export_delayed);
1169
1170 /*
1171  * Manage exports that have not completed recovery.
1172  */
1173 void class_handle_stale_exports(struct obd_device *obd)
1174 {
1175         struct list_head delay_list, evict_list;
1176         struct obd_export *exp, *n;
1177         int delayed = 0;
1178         ENTRY;
1179
1180         CFS_INIT_LIST_HEAD(&delay_list);
1181         CFS_INIT_LIST_HEAD(&evict_list);
1182         spin_lock(&obd->obd_dev_lock);
1183         list_for_each_entry_safe(exp, n, &obd->obd_exports, exp_obd_chain) {
1184                 LASSERT(!exp->exp_delayed);
1185                 /* clients finished recovery */
1186                 if (!exp->exp_replay_needed)
1187                         continue;
1188                 /* connected non-vbr clients are evicted */
1189                 if (exp->exp_in_recovery && !exp_connect_vbr(exp)) {
1190                         obd->obd_stale_clients++;
1191                         list_move_tail(&exp->exp_obd_chain, &evict_list);
1192                         continue;
1193                 }
1194                 if (obd->obd_version_recov || !exp->exp_in_recovery) {
1195                         list_move_tail(&exp->exp_obd_chain, &delay_list);
1196                         delayed++;
1197                 }
1198         }
1199 #ifndef HAVE_DELAYED_RECOVERY
1200         /* delayed recovery is turned off, evict all delayed exports */
1201         list_splice_init(&delay_list, &evict_list);
1202         list_splice_init(&obd->obd_delayed_exports, &evict_list);
1203         obd->obd_stale_clients += delayed;
1204 #endif
1205         spin_unlock(&obd->obd_dev_lock);
1206
1207         list_for_each_entry_safe(exp, n, &delay_list, exp_obd_chain) {
1208                 class_set_export_delayed(exp);
1209                 exp->exp_last_request_time = cfs_time_current_sec();
1210         }
1211         LASSERT(list_empty(&delay_list));
1212
1213         /* evict clients without VBR support */
1214         class_disconnect_export_list(&evict_list, exp_flags_from_obd(obd));
1215
1216         EXIT;
1217 }
1218 EXPORT_SYMBOL(class_handle_stale_exports);
1219
1220 int oig_init(struct obd_io_group **oig_out)
1221 {
1222         struct obd_io_group *oig;
1223         ENTRY;
1224
1225         OBD_ALLOC(oig, sizeof(*oig));
1226         if (oig == NULL)
1227                 RETURN(-ENOMEM);
1228
1229         spin_lock_init(&oig->oig_lock);
1230         oig->oig_rc = 0;
1231         oig->oig_pending = 0;
1232         atomic_set(&oig->oig_refcount, 1);
1233         cfs_waitq_init(&oig->oig_waitq);
1234         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1235
1236         *oig_out = oig;
1237         RETURN(0);
1238 };
1239 EXPORT_SYMBOL(oig_init);
1240
1241 static inline void oig_grab(struct obd_io_group *oig)
1242 {
1243         atomic_inc(&oig->oig_refcount);
1244 }
1245
1246 void oig_release(struct obd_io_group *oig)
1247 {
1248         if (atomic_dec_and_test(&oig->oig_refcount))
1249                 OBD_FREE(oig, sizeof(*oig));
1250 }
1251 EXPORT_SYMBOL(oig_release);
1252
1253 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1254 {
1255         int rc = 0;
1256         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1257         spin_lock(&oig->oig_lock);
1258         if (oig->oig_rc) {
1259                 rc = oig->oig_rc;
1260         } else {
1261                 oig->oig_pending++;
1262                 if (occ != NULL)
1263                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1264         }
1265         spin_unlock(&oig->oig_lock);
1266         oig_grab(oig);
1267
1268         return rc;
1269 }
1270 EXPORT_SYMBOL(oig_add_one);
1271
1272 void oig_complete_one(struct obd_io_group *oig,
1273                       struct oig_callback_context *occ, int rc)
1274 {
1275         cfs_waitq_t *wake = NULL;
1276         int old_rc;
1277
1278         spin_lock(&oig->oig_lock);
1279
1280         if (occ != NULL)
1281                 list_del_init(&occ->occ_oig_item);
1282
1283         old_rc = oig->oig_rc;
1284         if (oig->oig_rc == 0 && rc != 0)
1285                 oig->oig_rc = rc;
1286
1287         if (--oig->oig_pending <= 0)
1288                 wake = &oig->oig_waitq;
1289
1290         spin_unlock(&oig->oig_lock);
1291
1292         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1293                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1294                         oig->oig_pending);
1295         if (wake)
1296                 cfs_waitq_signal(wake);
1297         oig_release(oig);
1298 }
1299 EXPORT_SYMBOL(oig_complete_one);
1300
1301 static int oig_done(struct obd_io_group *oig)
1302 {
1303         int rc = 0;
1304         spin_lock(&oig->oig_lock);
1305         if (oig->oig_pending <= 0)
1306                 rc = 1;
1307         spin_unlock(&oig->oig_lock);
1308         return rc;
1309 }
1310
1311 static void interrupted_oig(void *data)
1312 {
1313         struct obd_io_group *oig = data;
1314         struct oig_callback_context *occ;
1315
1316         spin_lock(&oig->oig_lock);
1317         /* We need to restart the processing each time we drop the lock, as
1318          * it is possible other threads called oig_complete_one() to remove
1319          * an entry elsewhere in the list while we dropped lock.  We need to
1320          * drop the lock because osc_ap_completion() calls oig_complete_one()
1321          * which re-gets this lock ;-) as well as a lock ordering issue. */
1322 restart:
1323         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1324                 if (occ->interrupted)
1325                         continue;
1326                 occ->interrupted = 1;
1327                 spin_unlock(&oig->oig_lock);
1328                 occ->occ_interrupted(occ);
1329                 spin_lock(&oig->oig_lock);
1330                 goto restart;
1331         }
1332         spin_unlock(&oig->oig_lock);
1333 }
1334
1335 int oig_wait(struct obd_io_group *oig)
1336 {
1337         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1338         int rc;
1339
1340         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1341
1342         do {
1343                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1344                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1345                 /* we can't continue until the oig has emptied and stopped
1346                  * referencing state that the caller will free upon return */
1347                 if (rc == -EINTR)
1348                         lwi = (struct l_wait_info){ 0, };
1349         } while (rc == -EINTR);
1350
1351         LASSERTF(oig->oig_pending == 0,
1352                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1353                  oig->oig_pending);
1354
1355         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1356         return oig->oig_rc;
1357 }
1358 EXPORT_SYMBOL(oig_wait);
1359
1360 void class_fail_export(struct obd_export *exp)
1361 {
1362         int rc, already_failed;
1363
1364         spin_lock(&exp->exp_lock);
1365         already_failed = exp->exp_failed;
1366         exp->exp_failed = 1;
1367         spin_unlock(&exp->exp_lock);
1368
1369         if (already_failed) {
1370                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1371                        exp, exp->exp_client_uuid.uuid);
1372                 return;
1373         }
1374
1375         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1376                exp, exp->exp_client_uuid.uuid);
1377
1378         if (obd_dump_on_timeout)
1379                 libcfs_debug_dumplog();
1380
1381         /* Most callers into obd_disconnect are removing their own reference
1382          * (request, for example) in addition to the one from the hash table.
1383          * We don't have such a reference here, so make one. */
1384         class_export_get(exp);
1385         rc = obd_disconnect(exp);
1386         if (rc)
1387                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1388         else
1389                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1390                        exp, exp->exp_client_uuid.uuid);
1391 }
1392 EXPORT_SYMBOL(class_fail_export);
1393
1394 char *obd_export_nid2str(struct obd_export *exp)
1395 {
1396         if (exp->exp_connection != NULL)
1397                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1398
1399         return "(no nid)";
1400 }
1401 EXPORT_SYMBOL(obd_export_nid2str);
1402
1403 int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
1404 {
1405         struct obd_export *doomed_exp = NULL;
1406         int exports_evicted = 0;
1407
1408         lnet_nid_t nid_key = libcfs_str2nid(nid);
1409
1410         do {
1411                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1412
1413                 if (doomed_exp == NULL)
1414                         break;
1415
1416                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1417                          "nid %s found, wanted nid %s, requested nid %s\n",
1418                          obd_export_nid2str(doomed_exp),
1419                          libcfs_nid2str(nid_key), nid);
1420
1421                 exports_evicted++;
1422                 CDEBUG(D_HA, "%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1423                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1424                        exports_evicted);
1425                 class_fail_export(doomed_exp);
1426                 class_export_put(doomed_exp);
1427         } while (1);
1428
1429         if (!exports_evicted)
1430                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1431                        obd->obd_name, nid);
1432         return exports_evicted;
1433 }
1434 EXPORT_SYMBOL(obd_export_evict_by_nid);
1435
1436 int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
1437 {
1438         struct obd_export *doomed_exp = NULL;
1439         struct obd_uuid doomed_uuid;
1440         int exports_evicted = 0;
1441
1442         obd_str2uuid(&doomed_uuid, uuid);
1443         if(obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1444                 CERROR("%s: can't evict myself\n", obd->obd_name);
1445                 return exports_evicted;
1446         }
1447
1448         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1449
1450         if (doomed_exp == NULL) {
1451                 CERROR("%s: can't disconnect %s: no exports found\n",
1452                        obd->obd_name, uuid);
1453         } else {
1454                 CWARN("%s: evicting %s at adminstrative request\n",
1455                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1456                 class_fail_export(doomed_exp);
1457                 class_export_put(doomed_exp);
1458                 exports_evicted++;
1459         }
1460
1461         return exports_evicted;
1462 }
1463 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1464
1465 void obd_zombie_impexp_cull(void)
1466 {
1467         struct obd_import *import;
1468         struct obd_export *export;
1469
1470         do {
1471                 spin_lock (&obd_zombie_impexp_lock);
1472
1473                 import = NULL;
1474                 if (!list_empty(&obd_zombie_imports)) {
1475                         import = list_entry(obd_zombie_imports.next,
1476                                             struct obd_import,
1477                                             imp_zombie_chain);
1478                         list_del(&import->imp_zombie_chain);
1479                 }
1480
1481                 export = NULL;
1482                 if (!list_empty(&obd_zombie_exports)) {
1483                         export = list_entry(obd_zombie_exports.next,
1484                                             struct obd_export,
1485                                             exp_obd_chain);
1486                         list_del_init(&export->exp_obd_chain);
1487                 }
1488
1489                 spin_unlock(&obd_zombie_impexp_lock);
1490
1491                 if (import != NULL)
1492                         class_import_destroy(import);
1493
1494                 if (export != NULL)
1495                         class_export_destroy(export);
1496                 cfs_cond_resched();
1497         } while (import != NULL || export != NULL);
1498 }
1499
1500 static struct completion        obd_zombie_start;
1501 static struct completion        obd_zombie_stop;
1502 static unsigned long            obd_zombie_flags;
1503 static cfs_waitq_t              obd_zombie_waitq;
1504 static pid_t                    obd_zombie_pid;
1505
1506 enum {
1507         OBD_ZOMBIE_STOP = 1
1508 };
1509
1510 int obd_zombi_impexp_check(void *arg)
1511 {
1512         int rc;
1513
1514         spin_lock(&obd_zombie_impexp_lock);
1515         rc = list_empty(&obd_zombie_imports) &&
1516              list_empty(&obd_zombie_exports) &&
1517              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1518
1519         spin_unlock(&obd_zombie_impexp_lock);
1520
1521         RETURN(rc);
1522 }
1523
1524 static void obd_zombie_impexp_notify(void)
1525 {
1526         cfs_waitq_signal(&obd_zombie_waitq);
1527 }
1528
1529 /**
1530  * check whether obd_zombie is idle
1531  */
1532 static int obd_zombie_is_idle(void)
1533 {
1534         int rc;
1535
1536         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1537         spin_lock(&obd_zombie_impexp_lock);
1538         rc = list_empty(&obd_zombie_imports) &&
1539              list_empty(&obd_zombie_exports);
1540         spin_unlock(&obd_zombie_impexp_lock);
1541         return rc;
1542 }
1543
1544 /**
1545  * wait when obd_zombie import/export queues become empty
1546  */
1547 void obd_zombie_barrier(void)
1548 {
1549         struct l_wait_info lwi = { 0 };
1550
1551         if (obd_zombie_pid == cfs_curproc_pid())
1552                 /* don't wait for myself */
1553                 return;
1554         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1555 }
1556 EXPORT_SYMBOL(obd_zombie_barrier);
1557
1558 #ifdef __KERNEL__
1559
1560 static int obd_zombie_impexp_thread(void *unused)
1561 {
1562         int rc;
1563
1564         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1565                 complete(&obd_zombie_start);
1566                 RETURN(rc);
1567         }
1568
1569         complete(&obd_zombie_start);
1570
1571         obd_zombie_pid = cfs_curproc_pid();
1572
1573         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1574                 struct l_wait_info lwi = { 0 };
1575
1576                 l_wait_event(obd_zombie_waitq, !obd_zombi_impexp_check(NULL), &lwi);
1577
1578                 obd_zombie_impexp_cull();
1579
1580                 /*
1581                  * Notify obd_zombie_barrier callers that queues
1582                  * may be empty.
1583                  */
1584                 cfs_waitq_signal(&obd_zombie_waitq);
1585         }
1586
1587         complete(&obd_zombie_stop);
1588
1589         RETURN(0);
1590 }
1591
1592 #else /* ! KERNEL */
1593
1594 static atomic_t zombi_recur = ATOMIC_INIT(0);
1595 static void *obd_zombi_impexp_work_cb;
1596 static void *obd_zombi_impexp_idle_cb;
1597
1598 int obd_zombi_impexp_kill(void *arg)
1599 {
1600         int rc = 0;
1601
1602         if (atomic_inc_return(&zombi_recur) == 1) {
1603                 obd_zombie_impexp_cull();
1604                 rc = 1;
1605         }
1606         atomic_dec(&zombi_recur);
1607         return rc;
1608 }
1609
1610 #endif
1611
1612 int obd_zombie_impexp_init(void)
1613 {
1614         int rc;
1615
1616         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1617         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1618         spin_lock_init(&obd_zombie_impexp_lock);
1619         init_completion(&obd_zombie_start);
1620         init_completion(&obd_zombie_stop);
1621         cfs_waitq_init(&obd_zombie_waitq);
1622         obd_zombie_pid = 0;
1623
1624 #ifdef __KERNEL__
1625         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1626         if (rc < 0)
1627                 RETURN(rc);
1628
1629         wait_for_completion(&obd_zombie_start);
1630 #else
1631
1632         obd_zombi_impexp_work_cb =
1633                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1634                                                  &obd_zombi_impexp_kill, NULL);
1635
1636         obd_zombi_impexp_idle_cb =
1637                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1638                                                  &obd_zombi_impexp_check, NULL);
1639         rc = 0;
1640
1641 #endif
1642         RETURN(rc);
1643 }
1644
1645 void obd_zombie_impexp_stop(void)
1646 {
1647         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1648         obd_zombie_impexp_notify();
1649 #ifdef __KERNEL__
1650         wait_for_completion(&obd_zombie_stop);
1651 #else
1652         liblustre_deregister_wait_callback(obd_zombi_impexp_work_cb);
1653         liblustre_deregister_idle_callback(obd_zombi_impexp_idle_cb);
1654 #endif
1655 }