Whamcloud - gitweb
b=21571 stacksize and locking fixes for loadgen patch from umka
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern struct list_head obd_types;
51 spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 struct list_head  obd_zombie_imports;
59 struct list_head  obd_zombie_exports;
60 spinlock_t        obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83 EXPORT_SYMBOL(obd_device_alloc);
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef CONFIG_KMOD
121         if (!type) {
122                 const char *modname = name;
123                 if (!request_module("%s", modname)) {
124                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125                         type = class_search_type(name);
126                 } else {
127                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
128                                            modname);
129                 }
130         }
131 #endif
132         if (type) {
133                 spin_lock(&type->obd_type_lock);
134                 type->typ_refcnt++;
135                 try_module_get(type->typ_dt_ops->o_owner);
136                 spin_unlock(&type->obd_type_lock);
137         }
138         return type;
139 }
140
141 void class_put_type(struct obd_type *type)
142 {
143         LASSERT(type);
144         spin_lock(&type->obd_type_lock);
145         type->typ_refcnt--;
146         module_put(type->typ_dt_ops->o_owner);
147         spin_unlock(&type->obd_type_lock);
148 }
149
150 #define CLASS_MAX_NAME 1024
151
152 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
153                         struct lprocfs_vars *vars, const char *name,
154                         struct lu_device_type *ldt)
155 {
156         struct obd_type *type;
157         int rc = 0;
158         ENTRY;
159
160         /* sanity check */
161         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162
163         if (class_search_type(name)) {
164                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
165                 RETURN(-EEXIST);
166         }
167
168         rc = -ENOMEM;
169         OBD_ALLOC(type, sizeof(*type));
170         if (type == NULL)
171                 RETURN(rc);
172
173         OBD_ALLOC_PTR(type->typ_dt_ops);
174         OBD_ALLOC_PTR(type->typ_md_ops);
175         OBD_ALLOC(type->typ_name, strlen(name) + 1);
176
177         if (type->typ_dt_ops == NULL ||
178             type->typ_md_ops == NULL ||
179             type->typ_name == NULL)
180                 GOTO (failed, rc);
181
182         *(type->typ_dt_ops) = *dt_ops;
183         /* md_ops is optional */
184         if (md_ops)
185                 *(type->typ_md_ops) = *md_ops;
186         strcpy(type->typ_name, name);
187         spin_lock_init(&type->obd_type_lock);
188
189 #ifdef LPROCFS
190         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191                                               vars, type);
192         if (IS_ERR(type->typ_procroot)) {
193                 rc = PTR_ERR(type->typ_procroot);
194                 type->typ_procroot = NULL;
195                 GOTO (failed, rc);
196         }
197 #endif
198         if (ldt != NULL) {
199                 type->typ_lu = ldt;
200                 rc = lu_device_type_init(ldt);
201                 if (rc != 0)
202                         GOTO (failed, rc);
203         }
204
205         spin_lock(&obd_types_lock);
206         list_add(&type->typ_chain, &obd_types);
207         spin_unlock(&obd_types_lock);
208
209         RETURN (0);
210
211  failed:
212         if (type->typ_name != NULL)
213                 OBD_FREE(type->typ_name, strlen(name) + 1);
214         if (type->typ_md_ops != NULL)
215                 OBD_FREE_PTR(type->typ_md_ops);
216         if (type->typ_dt_ops != NULL)
217                 OBD_FREE_PTR(type->typ_dt_ops);
218         OBD_FREE(type, sizeof(*type));
219         RETURN(rc);
220 }
221
222 int class_unregister_type(const char *name)
223 {
224         struct obd_type *type = class_search_type(name);
225         ENTRY;
226
227         if (!type) {
228                 CERROR("unknown obd type\n");
229                 RETURN(-EINVAL);
230         }
231
232         if (type->typ_refcnt) {
233                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
234                 /* This is a bad situation, let's make the best of it */
235                 /* Remove ops, but leave the name for debugging */
236                 OBD_FREE_PTR(type->typ_dt_ops);
237                 OBD_FREE_PTR(type->typ_md_ops);
238                 RETURN(-EBUSY);
239         }
240
241         if (type->typ_procroot) {
242                 lprocfs_remove(&type->typ_procroot);
243         }
244
245         if (type->typ_lu)
246                 lu_device_type_fini(type->typ_lu);
247
248         spin_lock(&obd_types_lock);
249         list_del(&type->typ_chain);
250         spin_unlock(&obd_types_lock);
251         OBD_FREE(type->typ_name, strlen(name) + 1);
252         if (type->typ_dt_ops != NULL)
253                 OBD_FREE_PTR(type->typ_dt_ops);
254         if (type->typ_md_ops != NULL)
255                 OBD_FREE_PTR(type->typ_md_ops);
256         OBD_FREE(type, sizeof(*type));
257         RETURN(0);
258 } /* class_unregister_type */
259
260 /**
261  * Create a new obd device.
262  *
263  * Find an empty slot in ::obd_devs[], create a new obd device in it.
264  *
265  * \param[in] type_name obd device type string.
266  * \param[in] name      obd device name.
267  *
268  * \retval NULL if create fails, otherwise return the obd device
269  *         pointer created.
270  */
271 struct obd_device *class_newdev(const char *type_name, const char *name)
272 {
273         struct obd_device *result = NULL;
274         struct obd_device *newdev;
275         struct obd_type *type = NULL;
276         int i;
277         int new_obd_minor = 0;
278
279         if (strlen(name) >= MAX_OBD_NAME) {
280                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
281                 RETURN(ERR_PTR(-EINVAL));
282         }
283
284         type = class_get_type(type_name);
285         if (type == NULL){
286                 CERROR("OBD: unknown type: %s\n", type_name);
287                 RETURN(ERR_PTR(-ENODEV));
288         }
289
290         newdev = obd_device_alloc();
291         if (newdev == NULL) {
292                 class_put_type(type);
293                 RETURN(ERR_PTR(-ENOMEM));
294         }
295         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296
297         spin_lock(&obd_dev_lock);
298         for (i = 0; i < class_devno_max(); i++) {
299                 struct obd_device *obd = class_num2obd(i);
300                 if (obd && obd->obd_name &&
301                     (strcmp(name, obd->obd_name) == 0)) {
302                         CERROR("Device %s already exists, won't add\n", name);
303                         if (result) {
304                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
305                                          "%p obd_magic %08x != %08x\n", result,
306                                          result->obd_magic, OBD_DEVICE_MAGIC);
307                                 LASSERTF(result->obd_minor == new_obd_minor,
308                                          "%p obd_minor %d != %d\n", result,
309                                          result->obd_minor, new_obd_minor);
310
311                                 obd_devs[result->obd_minor] = NULL;
312                                 result->obd_name[0]='\0';
313                          }
314                         result = ERR_PTR(-EEXIST);
315                         break;
316                 }
317                 if (!result && !obd) {
318                         result = newdev;
319                         result->obd_minor = i;
320                         new_obd_minor = i;
321                         result->obd_type = type;
322                         strncpy(result->obd_name, name,
323                                 sizeof(result->obd_name) - 1);
324                         obd_devs[i] = result;
325                 }
326         }
327         spin_unlock(&obd_dev_lock);
328
329         if (result == NULL && i >= class_devno_max()) {
330                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331                        class_devno_max());
332                 result = ERR_PTR(-EOVERFLOW);
333         }
334
335         if (IS_ERR(result)) {
336                 obd_device_free(newdev);
337                 class_put_type(type);
338         } else {
339                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
340                        result->obd_name, result);
341         }
342         return result;
343 }
344
345 void class_release_dev(struct obd_device *obd)
346 {
347         struct obd_type *obd_type = obd->obd_type;
348
349         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353         LASSERT(obd_type != NULL);
354
355         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
356                obd->obd_name,obd->obd_type->typ_name);
357
358         spin_lock(&obd_dev_lock);
359         obd_devs[obd->obd_minor] = NULL;
360         spin_unlock(&obd_dev_lock);
361         obd_device_free(obd);
362
363         class_put_type(obd_type);
364 }
365
366 int class_name2dev(const char *name)
367 {
368         int i;
369
370         if (!name)
371                 return -1;
372
373         spin_lock(&obd_dev_lock);
374         for (i = 0; i < class_devno_max(); i++) {
375                 struct obd_device *obd = class_num2obd(i);
376                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
377                         /* Make sure we finished attaching before we give
378                            out any references */
379                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
380                         if (obd->obd_attached) {
381                                 spin_unlock(&obd_dev_lock);
382                                 return i;
383                         }
384                         break;
385                 }
386         }
387         spin_unlock(&obd_dev_lock);
388
389         return -1;
390 }
391
392 struct obd_device *class_name2obd(const char *name)
393 {
394         int dev = class_name2dev(name);
395
396         if (dev < 0 || dev > class_devno_max())
397                 return NULL;
398         return class_num2obd(dev);
399 }
400
401 int class_uuid2dev(struct obd_uuid *uuid)
402 {
403         int i;
404
405         spin_lock(&obd_dev_lock);
406         for (i = 0; i < class_devno_max(); i++) {
407                 struct obd_device *obd = class_num2obd(i);
408                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
409                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410                         spin_unlock(&obd_dev_lock);
411                         return i;
412                 }
413         }
414         spin_unlock(&obd_dev_lock);
415
416         return -1;
417 }
418
419 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 {
421         int dev = class_uuid2dev(uuid);
422         if (dev < 0)
423                 return NULL;
424         return class_num2obd(dev);
425 }
426
427 /**
428  * Get obd device from ::obd_devs[]
429  *
430  * \param num [in] array index
431  *
432  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
433  *         otherwise return the obd device there.
434  */
435 struct obd_device *class_num2obd(int num)
436 {
437         struct obd_device *obd = NULL;
438
439         if (num < class_devno_max()) {
440                 obd = obd_devs[num];
441                 if (obd == NULL)
442                         return NULL;
443
444                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
445                          "%p obd_magic %08x != %08x\n",
446                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447                 LASSERTF(obd->obd_minor == num,
448                          "%p obd_minor %0d != %0d\n",
449                          obd, obd->obd_minor, num);
450         }
451
452         return obd;
453 }
454
455 void class_obd_list(void)
456 {
457         char *status;
458         int i;
459
460         spin_lock(&obd_dev_lock);
461         for (i = 0; i < class_devno_max(); i++) {
462                 struct obd_device *obd = class_num2obd(i);
463                 if (obd == NULL)
464                         continue;
465                 if (obd->obd_stopping)
466                         status = "ST";
467                 else if (obd->obd_set_up)
468                         status = "UP";
469                 else if (obd->obd_attached)
470                         status = "AT";
471                 else
472                         status = "--";
473                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
474                          i, status, obd->obd_type->typ_name,
475                          obd->obd_name, obd->obd_uuid.uuid,
476                          atomic_read(&obd->obd_refcount));
477         }
478         spin_unlock(&obd_dev_lock);
479         return;
480 }
481
482 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
483    specified, then only the client with that uuid is returned,
484    otherwise any client connected to the tgt is returned. */
485 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
486                                           const char * typ_name,
487                                           struct obd_uuid *grp_uuid)
488 {
489         int i;
490
491         spin_lock(&obd_dev_lock);
492         for (i = 0; i < class_devno_max(); i++) {
493                 struct obd_device *obd = class_num2obd(i);
494                 if (obd == NULL)
495                         continue;
496                 if ((strncmp(obd->obd_type->typ_name, typ_name,
497                              strlen(typ_name)) == 0)) {
498                         if (obd_uuid_equals(tgt_uuid,
499                                             &obd->u.cli.cl_target_uuid) &&
500                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
501                                                          &obd->obd_uuid) : 1)) {
502                                 spin_unlock(&obd_dev_lock);
503                                 return obd;
504                         }
505                 }
506         }
507         spin_unlock(&obd_dev_lock);
508
509         return NULL;
510 }
511
512 /* Iterate the obd_device list looking devices have grp_uuid. Start
513    searching at *next, and if a device is found, the next index to look
514    at is saved in *next. If next is NULL, then the first matching device
515    will always be returned. */
516 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
517 {
518         int i;
519
520         if (next == NULL)
521                 i = 0;
522         else if (*next >= 0 && *next < class_devno_max())
523                 i = *next;
524         else
525                 return NULL;
526
527         spin_lock(&obd_dev_lock);
528         for (; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530                 if (obd == NULL)
531                         continue;
532                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
533                         if (next != NULL)
534                                 *next = i+1;
535                         spin_unlock(&obd_dev_lock);
536                         return obd;
537                 }
538         }
539         spin_unlock(&obd_dev_lock);
540
541         return NULL;
542 }
543
544 /**
545  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
546  * adjust sptlrpc settings accordingly.
547  */
548 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 {
550         struct obd_device  *obd;
551         const char         *type;
552         int                 i, rc = 0, rc2;
553
554         LASSERT(namelen > 0);
555
556         spin_lock(&obd_dev_lock);
557         for (i = 0; i < class_devno_max(); i++) {
558                 obd = class_num2obd(i);
559
560                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
561                         continue;
562
563                 /* only notify mdc, osc, mdt, ost */
564                 type = obd->obd_type->typ_name;
565                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
567                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
568                     strcmp(type, LUSTRE_OST_NAME) != 0)
569                         continue;
570
571                 if (strncmp(obd->obd_name, fsname, namelen))
572                         continue;
573
574                 class_incref(obd, __FUNCTION__, obd);
575                 spin_unlock(&obd_dev_lock);
576                 rc2 = obd_set_info_async(obd->obd_self_export,
577                                          sizeof(KEY_SPTLRPC_CONF),
578                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
579                 rc = rc ? rc : rc2;
580                 class_decref(obd, __FUNCTION__, obd);
581                 spin_lock(&obd_dev_lock);
582         }
583         spin_unlock(&obd_dev_lock);
584         return rc;
585 }
586 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587
588 void obd_cleanup_caches(void)
589 {
590         int rc;
591
592         ENTRY;
593         if (obd_device_cachep) {
594                 rc = cfs_mem_cache_destroy(obd_device_cachep);
595                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
596                 obd_device_cachep = NULL;
597         }
598         if (obdo_cachep) {
599                 rc = cfs_mem_cache_destroy(obdo_cachep);
600                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
601                 obdo_cachep = NULL;
602         }
603         if (import_cachep) {
604                 rc = cfs_mem_cache_destroy(import_cachep);
605                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
606                 import_cachep = NULL;
607         }
608         if (capa_cachep) {
609                 rc = cfs_mem_cache_destroy(capa_cachep);
610                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
611                 capa_cachep = NULL;
612         }
613         EXIT;
614 }
615
616 int obd_init_caches(void)
617 {
618         ENTRY;
619
620         LASSERT(obd_device_cachep == NULL);
621         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
622                                                  sizeof(struct obd_device),
623                                                  0, 0);
624         if (!obd_device_cachep)
625                 GOTO(out, -ENOMEM);
626
627         LASSERT(obdo_cachep == NULL);
628         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
629                                            0, 0);
630         if (!obdo_cachep)
631                 GOTO(out, -ENOMEM);
632
633         LASSERT(import_cachep == NULL);
634         import_cachep = cfs_mem_cache_create("ll_import_cache",
635                                              sizeof(struct obd_import),
636                                              0, 0);
637         if (!import_cachep)
638                 GOTO(out, -ENOMEM);
639
640         LASSERT(capa_cachep == NULL);
641         capa_cachep = cfs_mem_cache_create("capa_cache",
642                                            sizeof(struct obd_capa), 0, 0);
643         if (!capa_cachep)
644                 GOTO(out, -ENOMEM);
645
646         RETURN(0);
647  out:
648         obd_cleanup_caches();
649         RETURN(-ENOMEM);
650
651 }
652
653 /* map connection to client */
654 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 {
656         struct obd_export *export;
657         ENTRY;
658
659         if (!conn) {
660                 CDEBUG(D_CACHE, "looking for null handle\n");
661                 RETURN(NULL);
662         }
663
664         if (conn->cookie == -1) {  /* this means assign a new connection */
665                 CDEBUG(D_CACHE, "want a new connection\n");
666                 RETURN(NULL);
667         }
668
669         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
670         export = class_handle2object(conn->cookie);
671         RETURN(export);
672 }
673
674 struct obd_device *class_exp2obd(struct obd_export *exp)
675 {
676         if (exp)
677                 return exp->exp_obd;
678         return NULL;
679 }
680
681 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 {
683         struct obd_export *export;
684         export = class_conn2export(conn);
685         if (export) {
686                 struct obd_device *obd = export->exp_obd;
687                 class_export_put(export);
688                 return obd;
689         }
690         return NULL;
691 }
692
693 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 {
695         struct obd_device *obd = exp->exp_obd;
696         if (obd == NULL)
697                 return NULL;
698         return obd->u.cli.cl_import;
699 }
700
701 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 {
703         struct obd_device *obd = class_conn2obd(conn);
704         if (obd == NULL)
705                 return NULL;
706         return obd->u.cli.cl_import;
707 }
708
709 /* Export management functions */
710 static void class_export_destroy(struct obd_export *exp)
711 {
712         struct obd_device *obd = exp->exp_obd;
713         ENTRY;
714
715         LASSERT (atomic_read(&exp->exp_refcount) == 0);
716
717         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
718                exp->exp_client_uuid.uuid, obd->obd_name);
719
720         LASSERT(obd != NULL);
721
722         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
723         if (exp->exp_connection)
724                 ptlrpc_put_connection_superhack(exp->exp_connection);
725
726         LASSERT(list_empty(&exp->exp_outstanding_replies));
727         LASSERT(list_empty(&exp->exp_uncommitted_replies));
728         LASSERT(list_empty(&exp->exp_req_replay_queue));
729         LASSERT(list_empty(&exp->exp_queued_rpc));
730         obd_destroy_export(exp);
731         class_decref(obd, "export", exp);
732
733         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
734         EXIT;
735 }
736
737 static void export_handle_addref(void *export)
738 {
739         class_export_get(export);
740 }
741
742 struct obd_export *class_export_get(struct obd_export *exp)
743 {
744         atomic_inc(&exp->exp_refcount);
745         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
746                atomic_read(&exp->exp_refcount));
747         return exp;
748 }
749 EXPORT_SYMBOL(class_export_get);
750
751 void class_export_put(struct obd_export *exp)
752 {
753         LASSERT(exp != NULL);
754         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755                atomic_read(&exp->exp_refcount) - 1);
756         LASSERT(atomic_read(&exp->exp_refcount) > 0);
757         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
758
759         if (atomic_dec_and_test(&exp->exp_refcount)) {
760                 LASSERT(!list_empty(&exp->exp_obd_chain));
761                 CDEBUG(D_IOCTL, "final put %p/%s\n",
762                        exp, exp->exp_client_uuid.uuid);
763                 obd_zombie_export_add(exp);
764         }
765 }
766 EXPORT_SYMBOL(class_export_put);
767
768 /* Creates a new export, adds it to the hash table, and returns a
769  * pointer to it. The refcount is 2: one for the hash reference, and
770  * one for the pointer returned by this function. */
771 struct obd_export *class_new_export(struct obd_device *obd,
772                                     struct obd_uuid *cluuid)
773 {
774         struct obd_export *export;
775         int rc = 0;
776         ENTRY;
777
778         OBD_ALLOC_PTR(export);
779         if (!export)
780                 return ERR_PTR(-ENOMEM);
781
782         export->exp_conn_cnt = 0;
783         export->exp_lock_hash = NULL;
784         atomic_set(&export->exp_refcount, 2);
785         atomic_set(&export->exp_rpc_count, 0);
786         atomic_set(&export->exp_cb_count, 0);
787         atomic_set(&export->exp_locks_count, 0);
788 #if LUSTRE_TRACKS_LOCK_EXP_REFS
789         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
790         spin_lock_init(&export->exp_locks_list_guard);
791 #endif
792         atomic_set(&export->exp_replay_count, 0);
793         export->exp_obd = obd;
794         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
795         spin_lock_init(&export->exp_uncommitted_replies_lock);
796         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
797         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
798         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
799         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
800         class_handle_hash(&export->exp_handle, export_handle_addref);
801         export->exp_last_request_time = cfs_time_current_sec();
802         spin_lock_init(&export->exp_lock);
803         INIT_HLIST_NODE(&export->exp_uuid_hash);
804         INIT_HLIST_NODE(&export->exp_nid_hash);
805
806         export->exp_sp_peer = LUSTRE_SP_ANY;
807         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
808         export->exp_client_uuid = *cluuid;
809         obd_init_export(export);
810
811         spin_lock(&obd->obd_dev_lock);
812          /* shouldn't happen, but might race */
813         if (obd->obd_stopping)
814                 GOTO(exit_err, rc = -ENODEV);
815
816         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
817                 rc = cfs_hash_add_unique(obd->obd_uuid_hash, cluuid,
818                                          &export->exp_uuid_hash);
819                 if (rc != 0) {
820                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
821                                       obd->obd_name, cluuid->uuid, rc);
822                         GOTO(exit_err, rc = -EALREADY);
823                 }
824         }
825
826         class_incref(obd, "export", export);
827         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
828         list_add_tail(&export->exp_obd_chain_timed,
829                       &export->exp_obd->obd_exports_timed);
830         export->exp_obd->obd_num_exports++;
831         spin_unlock(&obd->obd_dev_lock);
832         RETURN(export);
833
834 exit_err:
835         spin_unlock(&obd->obd_dev_lock);
836         class_handle_unhash(&export->exp_handle);
837         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
838         obd_destroy_export(export);
839         OBD_FREE_PTR(export);
840         return ERR_PTR(rc);
841 }
842 EXPORT_SYMBOL(class_new_export);
843
844 void class_unlink_export(struct obd_export *exp)
845 {
846         class_handle_unhash(&exp->exp_handle);
847
848         spin_lock(&exp->exp_obd->obd_dev_lock);
849         /* delete an uuid-export hashitem from hashtables */
850         if (!hlist_unhashed(&exp->exp_uuid_hash))
851                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
852                              &exp->exp_client_uuid,
853                              &exp->exp_uuid_hash);
854
855         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
856         list_del_init(&exp->exp_obd_chain_timed);
857         exp->exp_obd->obd_num_exports--;
858         spin_unlock(&exp->exp_obd->obd_dev_lock);
859         class_export_put(exp);
860 }
861 EXPORT_SYMBOL(class_unlink_export);
862
863 /* Import management functions */
864 void class_import_destroy(struct obd_import *imp)
865 {
866         ENTRY;
867
868         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
869                 imp->imp_obd->obd_name);
870
871         LASSERT(atomic_read(&imp->imp_refcount) == 0);
872
873         ptlrpc_put_connection_superhack(imp->imp_connection);
874
875         while (!list_empty(&imp->imp_conn_list)) {
876                 struct obd_import_conn *imp_conn;
877
878                 imp_conn = list_entry(imp->imp_conn_list.next,
879                                       struct obd_import_conn, oic_item);
880                 list_del_init(&imp_conn->oic_item);
881                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
882                 OBD_FREE(imp_conn, sizeof(*imp_conn));
883         }
884
885         LASSERT(imp->imp_sec == NULL);
886         class_decref(imp->imp_obd, "import", imp);
887         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
888         EXIT;
889 }
890
891 static void import_handle_addref(void *import)
892 {
893         class_import_get(import);
894 }
895
896 struct obd_import *class_import_get(struct obd_import *import)
897 {
898         LASSERT(atomic_read(&import->imp_refcount) >= 0);
899         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
900         atomic_inc(&import->imp_refcount);
901         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
902                atomic_read(&import->imp_refcount),
903                import->imp_obd->obd_name);
904         return import;
905 }
906 EXPORT_SYMBOL(class_import_get);
907
908 void class_import_put(struct obd_import *imp)
909 {
910         ENTRY;
911
912         LASSERT(atomic_read(&imp->imp_refcount) > 0);
913         LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
914         LASSERT(list_empty(&imp->imp_zombie_chain));
915
916         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
917                atomic_read(&imp->imp_refcount) - 1,
918                imp->imp_obd->obd_name);
919
920         if (atomic_dec_and_test(&imp->imp_refcount)) {
921                 CDEBUG(D_INFO, "final put import %p\n", imp);
922                 obd_zombie_import_add(imp);
923         }
924
925         EXIT;
926 }
927 EXPORT_SYMBOL(class_import_put);
928
929 static void init_imp_at(struct imp_at *at) {
930         int i;
931         at_init(&at->iat_net_latency, 0, 0);
932         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
933                 /* max service estimates are tracked on the server side, so
934                    don't use the AT history here, just use the last reported
935                    val. (But keep hist for proc histogram, worst_ever) */
936                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
937                         AT_FLG_NOHIST);
938         }
939 }
940
941 struct obd_import *class_new_import(struct obd_device *obd)
942 {
943         struct obd_import *imp;
944
945         OBD_ALLOC(imp, sizeof(*imp));
946         if (imp == NULL)
947                 return NULL;
948
949         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
950         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
951         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
952         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
953         spin_lock_init(&imp->imp_lock);
954         imp->imp_last_success_conn = 0;
955         imp->imp_state = LUSTRE_IMP_NEW;
956         imp->imp_obd = class_incref(obd, "import", imp);
957         sema_init(&imp->imp_sec_mutex, 1);
958         cfs_waitq_init(&imp->imp_recovery_waitq);
959
960         atomic_set(&imp->imp_refcount, 2);
961         atomic_set(&imp->imp_unregistering, 0);
962         atomic_set(&imp->imp_inflight, 0);
963         atomic_set(&imp->imp_replay_inflight, 0);
964         atomic_set(&imp->imp_inval_count, 0);
965         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
966         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
967         class_handle_hash(&imp->imp_handle, import_handle_addref);
968         init_imp_at(&imp->imp_at);
969
970         /* the default magic is V2, will be used in connect RPC, and
971          * then adjusted according to the flags in request/reply. */
972         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
973
974         return imp;
975 }
976 EXPORT_SYMBOL(class_new_import);
977
978 void class_destroy_import(struct obd_import *import)
979 {
980         LASSERT(import != NULL);
981         LASSERT(import != LP_POISON);
982
983         class_handle_unhash(&import->imp_handle);
984
985         spin_lock(&import->imp_lock);
986         import->imp_generation++;
987         spin_unlock(&import->imp_lock);
988         class_import_put(import);
989 }
990 EXPORT_SYMBOL(class_destroy_import);
991
992 #if LUSTRE_TRACKS_LOCK_EXP_REFS
993
994 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
995 {
996         spin_lock(&exp->exp_locks_list_guard);
997
998         LASSERT(lock->l_exp_refs_nr >= 0);
999
1000         if (lock->l_exp_refs_target != NULL &&
1001             lock->l_exp_refs_target != exp) {
1002                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1003                               exp, lock, lock->l_exp_refs_target);
1004         }
1005         if ((lock->l_exp_refs_nr ++) == 0) {
1006                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1007                 lock->l_exp_refs_target = exp;
1008         }
1009         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1010                lock, exp, lock->l_exp_refs_nr);
1011         spin_unlock(&exp->exp_locks_list_guard);
1012 }
1013 EXPORT_SYMBOL(__class_export_add_lock_ref);
1014
1015 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1016 {
1017         spin_lock(&exp->exp_locks_list_guard);
1018         LASSERT(lock->l_exp_refs_nr > 0);
1019         if (lock->l_exp_refs_target != exp) {
1020                 LCONSOLE_WARN("lock %p, "
1021                               "mismatching export pointers: %p, %p\n",
1022                               lock, lock->l_exp_refs_target, exp);
1023         }
1024         if (-- lock->l_exp_refs_nr == 0) {
1025                 list_del_init(&lock->l_exp_refs_link);
1026                 lock->l_exp_refs_target = NULL;
1027         }
1028         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1029                lock, exp, lock->l_exp_refs_nr);
1030         spin_unlock(&exp->exp_locks_list_guard);
1031 }
1032 EXPORT_SYMBOL(__class_export_del_lock_ref);
1033 #endif
1034
1035 /* A connection defines an export context in which preallocation can
1036    be managed. This releases the export pointer reference, and returns
1037    the export handle, so the export refcount is 1 when this function
1038    returns. */
1039 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1040                   struct obd_uuid *cluuid)
1041 {
1042         struct obd_export *export;
1043         LASSERT(conn != NULL);
1044         LASSERT(obd != NULL);
1045         LASSERT(cluuid != NULL);
1046         ENTRY;
1047
1048         export = class_new_export(obd, cluuid);
1049         if (IS_ERR(export))
1050                 RETURN(PTR_ERR(export));
1051
1052         conn->cookie = export->exp_handle.h_cookie;
1053         class_export_put(export);
1054
1055         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1056                cluuid->uuid, conn->cookie);
1057         RETURN(0);
1058 }
1059 EXPORT_SYMBOL(class_connect);
1060
1061 /* if export is involved in recovery then clean up related things */
1062 void class_export_recovery_cleanup(struct obd_export *exp)
1063 {
1064         struct obd_device *obd = exp->exp_obd;
1065
1066         spin_lock_bh(&obd->obd_processing_task_lock);
1067         if (exp->exp_delayed)
1068                 obd->obd_delayed_clients--;
1069         if (obd->obd_recovering && exp->exp_in_recovery) {
1070                 spin_lock(&exp->exp_lock);
1071                 exp->exp_in_recovery = 0;
1072                 spin_unlock(&exp->exp_lock);
1073                 LASSERT(obd->obd_connected_clients);
1074                 obd->obd_connected_clients--;
1075         }
1076         /** Cleanup req replay fields */
1077         if (exp->exp_req_replay_needed) {
1078                 spin_lock(&exp->exp_lock);
1079                 exp->exp_req_replay_needed = 0;
1080                 spin_unlock(&exp->exp_lock);
1081                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1082                 atomic_dec(&obd->obd_req_replay_clients);
1083         }
1084         /** Cleanup lock replay data */
1085         if (exp->exp_lock_replay_needed) {
1086                 spin_lock(&exp->exp_lock);
1087                 exp->exp_lock_replay_needed = 0;
1088                 spin_unlock(&exp->exp_lock);
1089                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1090                 atomic_dec(&obd->obd_lock_replay_clients);
1091         }
1092         spin_unlock_bh(&obd->obd_processing_task_lock);
1093 }
1094
1095 /* This function removes 1-3 references from the export:
1096  * 1 - for export pointer passed
1097  * and if disconnect really need
1098  * 2 - removing from hash
1099  * 3 - in client_unlink_export
1100  * The export pointer passed to this function can destroyed */
1101 int class_disconnect(struct obd_export *export)
1102 {
1103         int already_disconnected;
1104         ENTRY;
1105
1106         if (export == NULL) {
1107                 fixme();
1108                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1109                 RETURN(-EINVAL);
1110         }
1111
1112         spin_lock(&export->exp_lock);
1113         already_disconnected = export->exp_disconnected;
1114         export->exp_disconnected = 1;
1115         spin_unlock(&export->exp_lock);
1116
1117         /* class_cleanup(), abort_recovery(), and class_fail_export()
1118          * all end up in here, and if any of them race we shouldn't
1119          * call extra class_export_puts(). */
1120         if (already_disconnected) {
1121                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1122                 GOTO(no_disconn, already_disconnected);
1123         }
1124
1125         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1126                export->exp_handle.h_cookie);
1127
1128         if (!hlist_unhashed(&export->exp_nid_hash))
1129                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1130                              &export->exp_connection->c_peer.nid,
1131                              &export->exp_nid_hash);
1132
1133         class_export_recovery_cleanup(export);
1134         class_unlink_export(export);
1135 no_disconn:
1136         class_export_put(export);
1137         RETURN(0);
1138 }
1139
1140 /* Return non-zero for a fully connected export */
1141 int class_connected_export(struct obd_export *exp)
1142 {
1143         if (exp) {
1144                 int connected;
1145                 spin_lock(&exp->exp_lock);
1146                 connected = (exp->exp_conn_cnt > 0);
1147                 spin_unlock(&exp->exp_lock);
1148                 return connected;
1149         }
1150         return 0;
1151 }
1152 EXPORT_SYMBOL(class_connected_export);
1153
1154 static void class_disconnect_export_list(struct list_head *list,
1155                                          enum obd_option flags)
1156 {
1157         int rc;
1158         struct obd_export *exp;
1159         ENTRY;
1160
1161         /* It's possible that an export may disconnect itself, but
1162          * nothing else will be added to this list. */
1163         while (!list_empty(list)) {
1164                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1165                 /* need for safe call CDEBUG after obd_disconnect */
1166                 class_export_get(exp);
1167
1168                 spin_lock(&exp->exp_lock);
1169                 exp->exp_flags = flags;
1170                 spin_unlock(&exp->exp_lock);
1171
1172                 if (obd_uuid_equals(&exp->exp_client_uuid,
1173                                     &exp->exp_obd->obd_uuid)) {
1174                         CDEBUG(D_HA,
1175                                "exp %p export uuid == obd uuid, don't discon\n",
1176                                exp);
1177                         /* Need to delete this now so we don't end up pointing
1178                          * to work_list later when this export is cleaned up. */
1179                         list_del_init(&exp->exp_obd_chain);
1180                         class_export_put(exp);
1181                         continue;
1182                 }
1183
1184                 class_export_get(exp);
1185                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1186                        "last request at "CFS_TIME_T"\n",
1187                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1188                        exp, exp->exp_last_request_time);
1189                 /* release one export reference anyway */
1190                 rc = obd_disconnect(exp);
1191
1192                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1193                        obd_export_nid2str(exp), exp, rc);
1194                 class_export_put(exp);
1195         }
1196         EXIT;
1197 }
1198
1199 void class_disconnect_exports(struct obd_device *obd)
1200 {
1201         struct list_head work_list;
1202         ENTRY;
1203
1204         /* Move all of the exports from obd_exports to a work list, en masse. */
1205         CFS_INIT_LIST_HEAD(&work_list);
1206         spin_lock(&obd->obd_dev_lock);
1207         list_splice_init(&obd->obd_exports, &work_list);
1208         list_splice_init(&obd->obd_delayed_exports, &work_list);
1209         spin_unlock(&obd->obd_dev_lock);
1210
1211         if (!list_empty(&work_list)) {
1212                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1213                        "disconnecting them\n", obd->obd_minor, obd);
1214                 class_disconnect_export_list(&work_list,
1215                                              exp_flags_from_obd(obd));
1216         } else
1217                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1218                        obd->obd_minor, obd);
1219         EXIT;
1220 }
1221 EXPORT_SYMBOL(class_disconnect_exports);
1222
1223 /* Remove exports that have not completed recovery.
1224  */
1225 void class_disconnect_stale_exports(struct obd_device *obd,
1226                                     int (*test_export)(struct obd_export *))
1227 {
1228         struct list_head work_list;
1229         struct list_head *pos, *n;
1230         struct obd_export *exp;
1231         int evicted = 0;
1232         ENTRY;
1233
1234         CFS_INIT_LIST_HEAD(&work_list);
1235         spin_lock(&obd->obd_dev_lock);
1236         list_for_each_safe(pos, n, &obd->obd_exports) {
1237                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1238                 if (test_export(exp))
1239                         continue;
1240
1241                 /* don't count self-export as client */
1242                 if (obd_uuid_equals(&exp->exp_client_uuid,
1243                                     &exp->exp_obd->obd_uuid))
1244                         continue;
1245
1246                 list_move(&exp->exp_obd_chain, &work_list);
1247                 evicted++;
1248                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1249                        obd->obd_name, exp->exp_client_uuid.uuid,
1250                        exp->exp_connection == NULL ? "<unknown>" :
1251                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1252                 print_export_data(exp, "EVICTING", 0);
1253         }
1254         spin_unlock(&obd->obd_dev_lock);
1255
1256         if (evicted) {
1257                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1258                        obd->obd_name, evicted);
1259                 obd->obd_stale_clients += evicted;
1260         }
1261         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1262                                                  OBD_OPT_ABORT_RECOV);
1263         EXIT;
1264 }
1265 EXPORT_SYMBOL(class_disconnect_stale_exports);
1266
1267 void class_fail_export(struct obd_export *exp)
1268 {
1269         int rc, already_failed;
1270
1271         spin_lock(&exp->exp_lock);
1272         already_failed = exp->exp_failed;
1273         exp->exp_failed = 1;
1274         spin_unlock(&exp->exp_lock);
1275
1276         if (already_failed) {
1277                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1278                        exp, exp->exp_client_uuid.uuid);
1279                 return;
1280         }
1281
1282         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1283                exp, exp->exp_client_uuid.uuid);
1284
1285         if (obd_dump_on_timeout)
1286                 libcfs_debug_dumplog();
1287
1288         /* Most callers into obd_disconnect are removing their own reference
1289          * (request, for example) in addition to the one from the hash table.
1290          * We don't have such a reference here, so make one. */
1291         class_export_get(exp);
1292         rc = obd_disconnect(exp);
1293         if (rc)
1294                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1295         else
1296                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1297                        exp, exp->exp_client_uuid.uuid);
1298 }
1299 EXPORT_SYMBOL(class_fail_export);
1300
1301 char *obd_export_nid2str(struct obd_export *exp)
1302 {
1303         if (exp->exp_connection != NULL)
1304                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1305
1306         return "(no nid)";
1307 }
1308 EXPORT_SYMBOL(obd_export_nid2str);
1309
1310 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1311 {
1312         struct obd_export *doomed_exp = NULL;
1313         int exports_evicted = 0;
1314
1315         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1316
1317         do {
1318                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1319                 if (doomed_exp == NULL)
1320                         break;
1321
1322                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1323                          "nid %s found, wanted nid %s, requested nid %s\n",
1324                          obd_export_nid2str(doomed_exp),
1325                          libcfs_nid2str(nid_key), nid);
1326                 LASSERTF(doomed_exp != obd->obd_self_export,
1327                          "self-export is hashed by NID?\n");
1328                 exports_evicted++;
1329                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1330                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1331                        exports_evicted);
1332                 class_fail_export(doomed_exp);
1333                 class_export_put(doomed_exp);
1334         } while (1);
1335
1336         if (!exports_evicted)
1337                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1338                        obd->obd_name, nid);
1339         return exports_evicted;
1340 }
1341 EXPORT_SYMBOL(obd_export_evict_by_nid);
1342
1343 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1344 {
1345         struct obd_export *doomed_exp = NULL;
1346         struct obd_uuid doomed_uuid;
1347         int exports_evicted = 0;
1348
1349         obd_str2uuid(&doomed_uuid, uuid);
1350         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1351                 CERROR("%s: can't evict myself\n", obd->obd_name);
1352                 return exports_evicted;
1353         }
1354
1355         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1356
1357         if (doomed_exp == NULL) {
1358                 CERROR("%s: can't disconnect %s: no exports found\n",
1359                        obd->obd_name, uuid);
1360         } else {
1361                 CWARN("%s: evicting %s at adminstrative request\n",
1362                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1363                 class_fail_export(doomed_exp);
1364                 class_export_put(doomed_exp);
1365                 exports_evicted++;
1366         }
1367
1368         return exports_evicted;
1369 }
1370 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1371
1372 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1373 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1374 EXPORT_SYMBOL(class_export_dump_hook);
1375 #endif
1376
1377 static void print_export_data(struct obd_export *exp, const char *status,
1378                               int locks)
1379 {
1380         struct ptlrpc_reply_state *rs;
1381         struct ptlrpc_reply_state *first_reply = NULL;
1382         int nreplies = 0;
1383
1384         spin_lock(&exp->exp_lock);
1385         list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1386                 if (nreplies == 0)
1387                         first_reply = rs;
1388                 nreplies++;
1389         }
1390         spin_unlock(&exp->exp_lock);
1391
1392         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1393                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1394                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1395                atomic_read(&exp->exp_rpc_count),
1396                atomic_read(&exp->exp_cb_count),
1397                atomic_read(&exp->exp_locks_count),
1398                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1399                nreplies, first_reply, nreplies > 3 ? "..." : "",
1400                exp->exp_last_committed);
1401 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1402         if (locks && class_export_dump_hook != NULL)
1403                 class_export_dump_hook(exp);
1404 #endif
1405 }
1406
1407 void dump_exports(struct obd_device *obd, int locks)
1408 {
1409         struct obd_export *exp;
1410
1411         spin_lock(&obd->obd_dev_lock);
1412         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1413                 print_export_data(exp, "ACTIVE", locks);
1414         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1415                 print_export_data(exp, "UNLINKED", locks);
1416         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1417                 print_export_data(exp, "DELAYED", locks);
1418         spin_unlock(&obd->obd_dev_lock);
1419         spin_lock(&obd_zombie_impexp_lock);
1420         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1421                 print_export_data(exp, "ZOMBIE", locks);
1422         spin_unlock(&obd_zombie_impexp_lock);
1423 }
1424 EXPORT_SYMBOL(dump_exports);
1425
1426 void obd_exports_barrier(struct obd_device *obd)
1427 {
1428         int waited = 2;
1429         LASSERT(list_empty(&obd->obd_exports));
1430         spin_lock(&obd->obd_dev_lock);
1431         while (!list_empty(&obd->obd_unlinked_exports)) {
1432                 spin_unlock(&obd->obd_dev_lock);
1433                 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1434                 if (waited > 5 && IS_PO2(waited)) {
1435                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1436                                       "more than %d seconds. "
1437                                       "The obd refcount = %d. Is it stuck?\n",
1438                                       obd->obd_name, waited,
1439                                       atomic_read(&obd->obd_refcount));
1440                         dump_exports(obd, 0);
1441                 }
1442                 waited *= 2;
1443                 spin_lock(&obd->obd_dev_lock);
1444         }
1445         spin_unlock(&obd->obd_dev_lock);
1446 }
1447 EXPORT_SYMBOL(obd_exports_barrier);
1448
1449 /**
1450  * kill zombie imports and exports
1451  */
1452 void obd_zombie_impexp_cull(void)
1453 {
1454         struct obd_import *import;
1455         struct obd_export *export;
1456         ENTRY;
1457
1458         do {
1459                 spin_lock(&obd_zombie_impexp_lock);
1460
1461                 import = NULL;
1462                 if (!list_empty(&obd_zombie_imports)) {
1463                         import = list_entry(obd_zombie_imports.next,
1464                                             struct obd_import,
1465                                             imp_zombie_chain);
1466                         list_del_init(&import->imp_zombie_chain);
1467                 }
1468
1469                 export = NULL;
1470                 if (!list_empty(&obd_zombie_exports)) {
1471                         export = list_entry(obd_zombie_exports.next,
1472                                             struct obd_export,
1473                                             exp_obd_chain);
1474                         list_del_init(&export->exp_obd_chain);
1475                 }
1476
1477                 spin_unlock(&obd_zombie_impexp_lock);
1478
1479                 if (import != NULL)
1480                         class_import_destroy(import);
1481
1482                 if (export != NULL)
1483                         class_export_destroy(export);
1484
1485         } while (import != NULL || export != NULL);
1486         EXIT;
1487 }
1488
1489 static struct completion        obd_zombie_start;
1490 static struct completion        obd_zombie_stop;
1491 static unsigned long            obd_zombie_flags;
1492 static cfs_waitq_t              obd_zombie_waitq;
1493 static pid_t                    obd_zombie_pid;
1494
1495 enum {
1496         OBD_ZOMBIE_STOP   = 1 << 1
1497 };
1498
1499 /**
1500  * check for work for kill zombie import/export thread.
1501  */
1502 static int obd_zombie_impexp_check(void *arg)
1503 {
1504         int rc;
1505
1506         spin_lock(&obd_zombie_impexp_lock);
1507         rc = list_empty(&obd_zombie_imports) &&
1508              list_empty(&obd_zombie_exports) &&
1509              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1510
1511         spin_unlock(&obd_zombie_impexp_lock);
1512
1513         RETURN(rc);
1514 }
1515
1516 /**
1517  * Add export to the obd_zombe thread and notify it.
1518  */
1519 static void obd_zombie_export_add(struct obd_export *exp) {
1520         spin_lock(&exp->exp_obd->obd_dev_lock);
1521         LASSERT(!list_empty(&exp->exp_obd_chain));
1522         list_del_init(&exp->exp_obd_chain);
1523         spin_unlock(&exp->exp_obd->obd_dev_lock);
1524         spin_lock(&obd_zombie_impexp_lock);
1525         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1526         spin_unlock(&obd_zombie_impexp_lock);
1527
1528         if (obd_zombie_impexp_notify != NULL)
1529                 obd_zombie_impexp_notify();
1530 }
1531
1532 /**
1533  * Add import to the obd_zombe thread and notify it.
1534  */
1535 static void obd_zombie_import_add(struct obd_import *imp) {
1536         LASSERT(imp->imp_sec == NULL);
1537         spin_lock(&obd_zombie_impexp_lock);
1538         LASSERT(list_empty(&imp->imp_zombie_chain));
1539         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1540         spin_unlock(&obd_zombie_impexp_lock);
1541
1542         if (obd_zombie_impexp_notify != NULL)
1543                 obd_zombie_impexp_notify();
1544 }
1545
1546 /**
1547  * notify import/export destroy thread about new zombie.
1548  */
1549 static void obd_zombie_impexp_notify(void)
1550 {
1551         cfs_waitq_signal(&obd_zombie_waitq);
1552 }
1553
1554 /**
1555  * check whether obd_zombie is idle
1556  */
1557 static int obd_zombie_is_idle(void)
1558 {
1559         int rc;
1560
1561         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1562         spin_lock(&obd_zombie_impexp_lock);
1563         rc = list_empty(&obd_zombie_imports) &&
1564              list_empty(&obd_zombie_exports);
1565         spin_unlock(&obd_zombie_impexp_lock);
1566         return rc;
1567 }
1568
1569 /**
1570  * wait when obd_zombie import/export queues become empty
1571  */
1572 void obd_zombie_barrier(void)
1573 {
1574         struct l_wait_info lwi = { 0 };
1575
1576         if (obd_zombie_pid == cfs_curproc_pid())
1577                 /* don't wait for myself */
1578                 return;
1579         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1580 }
1581 EXPORT_SYMBOL(obd_zombie_barrier);
1582
1583 #ifdef __KERNEL__
1584
1585 /**
1586  * destroy zombie export/import thread.
1587  */
1588 static int obd_zombie_impexp_thread(void *unused)
1589 {
1590         int rc;
1591
1592         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1593                 complete(&obd_zombie_start);
1594                 RETURN(rc);
1595         }
1596
1597         complete(&obd_zombie_start);
1598
1599         obd_zombie_pid = cfs_curproc_pid();
1600
1601         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1602                 struct l_wait_info lwi = { 0 };
1603
1604                 l_wait_event(obd_zombie_waitq,
1605                              !obd_zombie_impexp_check(NULL), &lwi);
1606                 obd_zombie_impexp_cull();
1607
1608                 /*
1609                  * Notify obd_zombie_barrier callers that queues
1610                  * may be empty.
1611                  */
1612                 cfs_waitq_signal(&obd_zombie_waitq);
1613         }
1614
1615         complete(&obd_zombie_stop);
1616
1617         RETURN(0);
1618 }
1619
1620 #else /* ! KERNEL */
1621
1622 static atomic_t zombie_recur = ATOMIC_INIT(0);
1623 static void *obd_zombie_impexp_work_cb;
1624 static void *obd_zombie_impexp_idle_cb;
1625
1626 int obd_zombie_impexp_kill(void *arg)
1627 {
1628         int rc = 0;
1629
1630         if (atomic_inc_return(&zombie_recur) == 1) {
1631                 obd_zombie_impexp_cull();
1632                 rc = 1;
1633         }
1634         atomic_dec(&zombie_recur);
1635         return rc;
1636 }
1637
1638 #endif
1639
1640 /**
1641  * start destroy zombie import/export thread
1642  */
1643 int obd_zombie_impexp_init(void)
1644 {
1645         int rc;
1646
1647         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1648         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1649         spin_lock_init(&obd_zombie_impexp_lock);
1650         init_completion(&obd_zombie_start);
1651         init_completion(&obd_zombie_stop);
1652         cfs_waitq_init(&obd_zombie_waitq);
1653         obd_zombie_pid = 0;
1654
1655 #ifdef __KERNEL__
1656         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1657         if (rc < 0)
1658                 RETURN(rc);
1659
1660         wait_for_completion(&obd_zombie_start);
1661 #else
1662
1663         obd_zombie_impexp_work_cb =
1664                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1665                                                  &obd_zombie_impexp_kill, NULL);
1666
1667         obd_zombie_impexp_idle_cb =
1668                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1669                                                  &obd_zombie_impexp_check, NULL);
1670         rc = 0;
1671 #endif
1672         RETURN(rc);
1673 }
1674 /**
1675  * stop destroy zombie import/export thread
1676  */
1677 void obd_zombie_impexp_stop(void)
1678 {
1679         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1680         obd_zombie_impexp_notify();
1681 #ifdef __KERNEL__
1682         wait_for_completion(&obd_zombie_stop);
1683 #else
1684         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1685         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1686 #endif
1687 }
1688