Whamcloud - gitweb
LU-1158 general: added nanosecond OBD connect flag
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123                 if (!cfs_request_module("%s", modname)) {
124                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125                         type = class_search_type(name);
126                 } else {
127                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
128                                            modname);
129                 }
130         }
131 #endif
132         if (type) {
133                 cfs_spin_lock(&type->obd_type_lock);
134                 type->typ_refcnt++;
135                 cfs_try_module_get(type->typ_dt_ops->o_owner);
136                 cfs_spin_unlock(&type->obd_type_lock);
137         }
138         return type;
139 }
140 EXPORT_SYMBOL(class_get_type);
141
142 void class_put_type(struct obd_type *type)
143 {
144         LASSERT(type);
145         cfs_spin_lock(&type->obd_type_lock);
146         type->typ_refcnt--;
147         cfs_module_put(type->typ_dt_ops->o_owner);
148         cfs_spin_unlock(&type->obd_type_lock);
149 }
150 EXPORT_SYMBOL(class_put_type);
151
152 #define CLASS_MAX_NAME 1024
153
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155                         struct lprocfs_vars *vars, const char *name,
156                         struct lu_device_type *ldt)
157 {
158         struct obd_type *type;
159         int rc = 0;
160         ENTRY;
161
162         /* sanity check */
163         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
164
165         if (class_search_type(name)) {
166                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
167                 RETURN(-EEXIST);
168         }
169
170         rc = -ENOMEM;
171         OBD_ALLOC(type, sizeof(*type));
172         if (type == NULL)
173                 RETURN(rc);
174
175         OBD_ALLOC_PTR(type->typ_dt_ops);
176         OBD_ALLOC_PTR(type->typ_md_ops);
177         OBD_ALLOC(type->typ_name, strlen(name) + 1);
178
179         if (type->typ_dt_ops == NULL ||
180             type->typ_md_ops == NULL ||
181             type->typ_name == NULL)
182                 GOTO (failed, rc);
183
184         *(type->typ_dt_ops) = *dt_ops;
185         /* md_ops is optional */
186         if (md_ops)
187                 *(type->typ_md_ops) = *md_ops;
188         strcpy(type->typ_name, name);
189         cfs_spin_lock_init(&type->obd_type_lock);
190
191 #ifdef LPROCFS
192         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
193                                               vars, type);
194         if (IS_ERR(type->typ_procroot)) {
195                 rc = PTR_ERR(type->typ_procroot);
196                 type->typ_procroot = NULL;
197                 GOTO (failed, rc);
198         }
199 #endif
200         if (ldt != NULL) {
201                 type->typ_lu = ldt;
202                 rc = lu_device_type_init(ldt);
203                 if (rc != 0)
204                         GOTO (failed, rc);
205         }
206
207         cfs_spin_lock(&obd_types_lock);
208         cfs_list_add(&type->typ_chain, &obd_types);
209         cfs_spin_unlock(&obd_types_lock);
210
211         RETURN (0);
212
213  failed:
214         if (type->typ_name != NULL)
215                 OBD_FREE(type->typ_name, strlen(name) + 1);
216         if (type->typ_md_ops != NULL)
217                 OBD_FREE_PTR(type->typ_md_ops);
218         if (type->typ_dt_ops != NULL)
219                 OBD_FREE_PTR(type->typ_dt_ops);
220         OBD_FREE(type, sizeof(*type));
221         RETURN(rc);
222 }
223 EXPORT_SYMBOL(class_register_type);
224
225 int class_unregister_type(const char *name)
226 {
227         struct obd_type *type = class_search_type(name);
228         ENTRY;
229
230         if (!type) {
231                 CERROR("unknown obd type\n");
232                 RETURN(-EINVAL);
233         }
234
235         if (type->typ_refcnt) {
236                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
237                 /* This is a bad situation, let's make the best of it */
238                 /* Remove ops, but leave the name for debugging */
239                 OBD_FREE_PTR(type->typ_dt_ops);
240                 OBD_FREE_PTR(type->typ_md_ops);
241                 RETURN(-EBUSY);
242         }
243
244         if (type->typ_procroot) {
245                 lprocfs_remove(&type->typ_procroot);
246         }
247
248         if (type->typ_lu)
249                 lu_device_type_fini(type->typ_lu);
250
251         cfs_spin_lock(&obd_types_lock);
252         cfs_list_del(&type->typ_chain);
253         cfs_spin_unlock(&obd_types_lock);
254         OBD_FREE(type->typ_name, strlen(name) + 1);
255         if (type->typ_dt_ops != NULL)
256                 OBD_FREE_PTR(type->typ_dt_ops);
257         if (type->typ_md_ops != NULL)
258                 OBD_FREE_PTR(type->typ_md_ops);
259         OBD_FREE(type, sizeof(*type));
260         RETURN(0);
261 } /* class_unregister_type */
262 EXPORT_SYMBOL(class_unregister_type);
263
264 /**
265  * Create a new obd device.
266  *
267  * Find an empty slot in ::obd_devs[], create a new obd device in it.
268  *
269  * \param[in] type_name obd device type string.
270  * \param[in] name      obd device name.
271  *
272  * \retval NULL if create fails, otherwise return the obd device
273  *         pointer created.
274  */
275 struct obd_device *class_newdev(const char *type_name, const char *name)
276 {
277         struct obd_device *result = NULL;
278         struct obd_device *newdev;
279         struct obd_type *type = NULL;
280         int i;
281         int new_obd_minor = 0;
282         ENTRY;
283
284         if (strlen(name) >= MAX_OBD_NAME) {
285                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286                 RETURN(ERR_PTR(-EINVAL));
287         }
288
289         type = class_get_type(type_name);
290         if (type == NULL){
291                 CERROR("OBD: unknown type: %s\n", type_name);
292                 RETURN(ERR_PTR(-ENODEV));
293         }
294
295         newdev = obd_device_alloc();
296         if (newdev == NULL) {
297                 class_put_type(type);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
301
302         cfs_write_lock(&obd_dev_lock);
303         for (i = 0; i < class_devno_max(); i++) {
304                 struct obd_device *obd = class_num2obd(i);
305
306                 if (obd && obd->obd_name &&
307                     (strcmp(name, obd->obd_name) == 0)) {
308                         CERROR("Device %s already exists at %d, won't add\n",
309                                name, i);
310                         if (result) {
311                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312                                          "%p obd_magic %08x != %08x\n", result,
313                                          result->obd_magic, OBD_DEVICE_MAGIC);
314                                 LASSERTF(result->obd_minor == new_obd_minor,
315                                          "%p obd_minor %d != %d\n", result,
316                                          result->obd_minor, new_obd_minor);
317
318                                 obd_devs[result->obd_minor] = NULL;
319                                 result->obd_name[0]='\0';
320                          }
321                         result = ERR_PTR(-EEXIST);
322                         break;
323                 }
324                 if (!result && !obd) {
325                         result = newdev;
326                         result->obd_minor = i;
327                         new_obd_minor = i;
328                         result->obd_type = type;
329                         strncpy(result->obd_name, name,
330                                 sizeof(result->obd_name) - 1);
331                         obd_devs[i] = result;
332                 }
333         }
334         cfs_write_unlock(&obd_dev_lock);
335
336         if (result == NULL && i >= class_devno_max()) {
337                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338                        class_devno_max());
339                 RETURN(ERR_PTR(-EOVERFLOW));
340         }
341
342         if (IS_ERR(result)) {
343                 obd_device_free(newdev);
344                 class_put_type(type);
345         } else {
346                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347                        result->obd_name, result);
348         }
349         RETURN(result);
350 }
351
352 void class_release_dev(struct obd_device *obd)
353 {
354         struct obd_type *obd_type = obd->obd_type;
355
356         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360         LASSERT(obd_type != NULL);
361
362         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
363                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
364
365         cfs_write_lock(&obd_dev_lock);
366         obd_devs[obd->obd_minor] = NULL;
367         cfs_write_unlock(&obd_dev_lock);
368         obd_device_free(obd);
369
370         class_put_type(obd_type);
371 }
372
373 int class_name2dev(const char *name)
374 {
375         int i;
376
377         if (!name)
378                 return -1;
379
380         cfs_read_lock(&obd_dev_lock);
381         for (i = 0; i < class_devno_max(); i++) {
382                 struct obd_device *obd = class_num2obd(i);
383
384                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385                         /* Make sure we finished attaching before we give
386                            out any references */
387                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388                         if (obd->obd_attached) {
389                                 cfs_read_unlock(&obd_dev_lock);
390                                 return i;
391                         }
392                         break;
393                 }
394         }
395         cfs_read_unlock(&obd_dev_lock);
396
397         return -1;
398 }
399 EXPORT_SYMBOL(class_name2dev);
400
401 struct obd_device *class_name2obd(const char *name)
402 {
403         int dev = class_name2dev(name);
404
405         if (dev < 0 || dev > class_devno_max())
406                 return NULL;
407         return class_num2obd(dev);
408 }
409 EXPORT_SYMBOL(class_name2obd);
410
411 int class_uuid2dev(struct obd_uuid *uuid)
412 {
413         int i;
414
415         cfs_read_lock(&obd_dev_lock);
416         for (i = 0; i < class_devno_max(); i++) {
417                 struct obd_device *obd = class_num2obd(i);
418
419                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421                         cfs_read_unlock(&obd_dev_lock);
422                         return i;
423                 }
424         }
425         cfs_read_unlock(&obd_dev_lock);
426
427         return -1;
428 }
429 EXPORT_SYMBOL(class_uuid2dev);
430
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
432 {
433         int dev = class_uuid2dev(uuid);
434         if (dev < 0)
435                 return NULL;
436         return class_num2obd(dev);
437 }
438 EXPORT_SYMBOL(class_uuid2obd);
439
440 /**
441  * Get obd device from ::obd_devs[]
442  *
443  * \param num [in] array index
444  *
445  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446  *         otherwise return the obd device there.
447  */
448 struct obd_device *class_num2obd(int num)
449 {
450         struct obd_device *obd = NULL;
451
452         if (num < class_devno_max()) {
453                 obd = obd_devs[num];
454                 if (obd == NULL)
455                         return NULL;
456
457                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458                          "%p obd_magic %08x != %08x\n",
459                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460                 LASSERTF(obd->obd_minor == num,
461                          "%p obd_minor %0d != %0d\n",
462                          obd, obd->obd_minor, num);
463         }
464
465         return obd;
466 }
467 EXPORT_SYMBOL(class_num2obd);
468
469 void class_obd_list(void)
470 {
471         char *status;
472         int i;
473
474         cfs_read_lock(&obd_dev_lock);
475         for (i = 0; i < class_devno_max(); i++) {
476                 struct obd_device *obd = class_num2obd(i);
477
478                 if (obd == NULL)
479                         continue;
480                 if (obd->obd_stopping)
481                         status = "ST";
482                 else if (obd->obd_set_up)
483                         status = "UP";
484                 else if (obd->obd_attached)
485                         status = "AT";
486                 else
487                         status = "--";
488                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489                          i, status, obd->obd_type->typ_name,
490                          obd->obd_name, obd->obd_uuid.uuid,
491                          cfs_atomic_read(&obd->obd_refcount));
492         }
493         cfs_read_unlock(&obd_dev_lock);
494         return;
495 }
496
497 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
498    specified, then only the client with that uuid is returned,
499    otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501                                           const char * typ_name,
502                                           struct obd_uuid *grp_uuid)
503 {
504         int i;
505
506         cfs_read_lock(&obd_dev_lock);
507         for (i = 0; i < class_devno_max(); i++) {
508                 struct obd_device *obd = class_num2obd(i);
509
510                 if (obd == NULL)
511                         continue;
512                 if ((strncmp(obd->obd_type->typ_name, typ_name,
513                              strlen(typ_name)) == 0)) {
514                         if (obd_uuid_equals(tgt_uuid,
515                                             &obd->u.cli.cl_target_uuid) &&
516                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
517                                                          &obd->obd_uuid) : 1)) {
518                                 cfs_read_unlock(&obd_dev_lock);
519                                 return obd;
520                         }
521                 }
522         }
523         cfs_read_unlock(&obd_dev_lock);
524
525         return NULL;
526 }
527 EXPORT_SYMBOL(class_find_client_obd);
528
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530    searching at *next, and if a device is found, the next index to look
531    at is saved in *next. If next is NULL, then the first matching device
532    will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
534 {
535         int i;
536
537         if (next == NULL)
538                 i = 0;
539         else if (*next >= 0 && *next < class_devno_max())
540                 i = *next;
541         else
542                 return NULL;
543
544         cfs_read_lock(&obd_dev_lock);
545         for (; i < class_devno_max(); i++) {
546                 struct obd_device *obd = class_num2obd(i);
547
548                 if (obd == NULL)
549                         continue;
550                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
551                         if (next != NULL)
552                                 *next = i+1;
553                         cfs_read_unlock(&obd_dev_lock);
554                         return obd;
555                 }
556         }
557         cfs_read_unlock(&obd_dev_lock);
558
559         return NULL;
560 }
561 EXPORT_SYMBOL(class_devices_in_group);
562
563 /**
564  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565  * adjust sptlrpc settings accordingly.
566  */
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
568 {
569         struct obd_device  *obd;
570         const char         *type;
571         int                 i, rc = 0, rc2;
572
573         LASSERT(namelen > 0);
574
575         cfs_read_lock(&obd_dev_lock);
576         for (i = 0; i < class_devno_max(); i++) {
577                 obd = class_num2obd(i);
578
579                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
580                         continue;
581
582                 /* only notify mdc, osc, mdt, ost */
583                 type = obd->obd_type->typ_name;
584                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587                     strcmp(type, LUSTRE_OST_NAME) != 0)
588                         continue;
589
590                 if (strncmp(obd->obd_name, fsname, namelen))
591                         continue;
592
593                 class_incref(obd, __FUNCTION__, obd);
594                 cfs_read_unlock(&obd_dev_lock);
595                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
596                                          sizeof(KEY_SPTLRPC_CONF),
597                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
598                 rc = rc ? rc : rc2;
599                 class_decref(obd, __FUNCTION__, obd);
600                 cfs_read_lock(&obd_dev_lock);
601         }
602         cfs_read_unlock(&obd_dev_lock);
603         return rc;
604 }
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
606
607 void obd_cleanup_caches(void)
608 {
609         int rc;
610
611         ENTRY;
612         if (obd_device_cachep) {
613                 rc = cfs_mem_cache_destroy(obd_device_cachep);
614                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615                 obd_device_cachep = NULL;
616         }
617         if (obdo_cachep) {
618                 rc = cfs_mem_cache_destroy(obdo_cachep);
619                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
620                 obdo_cachep = NULL;
621         }
622         if (import_cachep) {
623                 rc = cfs_mem_cache_destroy(import_cachep);
624                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625                 import_cachep = NULL;
626         }
627         if (capa_cachep) {
628                 rc = cfs_mem_cache_destroy(capa_cachep);
629                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
630                 capa_cachep = NULL;
631         }
632         EXIT;
633 }
634
635 int obd_init_caches(void)
636 {
637         ENTRY;
638
639         LASSERT(obd_device_cachep == NULL);
640         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641                                                  sizeof(struct obd_device),
642                                                  0, 0);
643         if (!obd_device_cachep)
644                 GOTO(out, -ENOMEM);
645
646         LASSERT(obdo_cachep == NULL);
647         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
648                                            0, 0);
649         if (!obdo_cachep)
650                 GOTO(out, -ENOMEM);
651
652         LASSERT(import_cachep == NULL);
653         import_cachep = cfs_mem_cache_create("ll_import_cache",
654                                              sizeof(struct obd_import),
655                                              0, 0);
656         if (!import_cachep)
657                 GOTO(out, -ENOMEM);
658
659         LASSERT(capa_cachep == NULL);
660         capa_cachep = cfs_mem_cache_create("capa_cache",
661                                            sizeof(struct obd_capa), 0, 0);
662         if (!capa_cachep)
663                 GOTO(out, -ENOMEM);
664
665         RETURN(0);
666  out:
667         obd_cleanup_caches();
668         RETURN(-ENOMEM);
669
670 }
671
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
674 {
675         struct obd_export *export;
676         ENTRY;
677
678         if (!conn) {
679                 CDEBUG(D_CACHE, "looking for null handle\n");
680                 RETURN(NULL);
681         }
682
683         if (conn->cookie == -1) {  /* this means assign a new connection */
684                 CDEBUG(D_CACHE, "want a new connection\n");
685                 RETURN(NULL);
686         }
687
688         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689         export = class_handle2object(conn->cookie);
690         RETURN(export);
691 }
692 EXPORT_SYMBOL(class_conn2export);
693
694 struct obd_device *class_exp2obd(struct obd_export *exp)
695 {
696         if (exp)
697                 return exp->exp_obd;
698         return NULL;
699 }
700 EXPORT_SYMBOL(class_exp2obd);
701
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
703 {
704         struct obd_export *export;
705         export = class_conn2export(conn);
706         if (export) {
707                 struct obd_device *obd = export->exp_obd;
708                 class_export_put(export);
709                 return obd;
710         }
711         return NULL;
712 }
713 EXPORT_SYMBOL(class_conn2obd);
714
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
716 {
717         struct obd_device *obd = exp->exp_obd;
718         if (obd == NULL)
719                 return NULL;
720         return obd->u.cli.cl_import;
721 }
722 EXPORT_SYMBOL(class_exp2cliimp);
723
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
725 {
726         struct obd_device *obd = class_conn2obd(conn);
727         if (obd == NULL)
728                 return NULL;
729         return obd->u.cli.cl_import;
730 }
731 EXPORT_SYMBOL(class_conn2cliimp);
732
733 /* Export management functions */
734
735 /* if export is involved in recovery then clean up related things */
736 void class_export_recovery_cleanup(struct obd_export *exp)
737 {
738         struct obd_device *obd = exp->exp_obd;
739
740         cfs_spin_lock(&obd->obd_recovery_task_lock);
741         if (exp->exp_delayed)
742                 obd->obd_delayed_clients--;
743         if (obd->obd_recovering && exp->exp_in_recovery) {
744                 cfs_spin_lock(&exp->exp_lock);
745                 exp->exp_in_recovery = 0;
746                 cfs_spin_unlock(&exp->exp_lock);
747                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
748                 cfs_atomic_dec(&obd->obd_connected_clients);
749         }
750         cfs_spin_unlock(&obd->obd_recovery_task_lock);
751         /** Cleanup req replay fields */
752         if (exp->exp_req_replay_needed) {
753                 cfs_spin_lock(&exp->exp_lock);
754                 exp->exp_req_replay_needed = 0;
755                 cfs_spin_unlock(&exp->exp_lock);
756                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
757                 cfs_atomic_dec(&obd->obd_req_replay_clients);
758         }
759         /** Cleanup lock replay data */
760         if (exp->exp_lock_replay_needed) {
761                 cfs_spin_lock(&exp->exp_lock);
762                 exp->exp_lock_replay_needed = 0;
763                 cfs_spin_unlock(&exp->exp_lock);
764                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
765                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
766         }
767 }
768
769 static void class_export_destroy(struct obd_export *exp)
770 {
771         struct obd_device *obd = exp->exp_obd;
772         ENTRY;
773
774         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
775         LASSERT(obd != NULL);
776
777         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
778                exp->exp_client_uuid.uuid, obd->obd_name);
779
780
781         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
782         if (exp->exp_connection)
783                 ptlrpc_put_connection_superhack(exp->exp_connection);
784
785         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
786         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
787         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
788         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
789         obd_destroy_export(exp);
790         class_decref(obd, "export", exp);
791
792         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
793         EXIT;
794 }
795
796 static void export_handle_addref(void *export)
797 {
798         class_export_get(export);
799 }
800
801 static struct portals_handle_ops export_handle_ops = {
802         .hop_addref = export_handle_addref,
803         .hop_free   = NULL,
804 };
805
806 struct obd_export *class_export_get(struct obd_export *exp)
807 {
808         cfs_atomic_inc(&exp->exp_refcount);
809         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
810                cfs_atomic_read(&exp->exp_refcount));
811         return exp;
812 }
813 EXPORT_SYMBOL(class_export_get);
814
815 void class_export_put(struct obd_export *exp)
816 {
817         LASSERT(exp != NULL);
818         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
819         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
820                cfs_atomic_read(&exp->exp_refcount) - 1);
821
822         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
823                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
824                 CDEBUG(D_IOCTL, "final put %p/%s\n",
825                        exp, exp->exp_client_uuid.uuid);
826
827                 /* release nid stat refererence */
828                 lprocfs_exp_cleanup(exp);
829                 class_export_recovery_cleanup(exp);
830
831                 obd_zombie_export_add(exp);
832         }
833 }
834 EXPORT_SYMBOL(class_export_put);
835
836 /* Creates a new export, adds it to the hash table, and returns a
837  * pointer to it. The refcount is 2: one for the hash reference, and
838  * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840                                     struct obd_uuid *cluuid)
841 {
842         struct obd_export *export;
843         cfs_hash_t *hash = NULL;
844         int rc = 0;
845         ENTRY;
846
847         OBD_ALLOC_PTR(export);
848         if (!export)
849                 return ERR_PTR(-ENOMEM);
850
851         export->exp_conn_cnt = 0;
852         export->exp_lock_hash = NULL;
853         export->exp_flock_hash = NULL;
854         cfs_atomic_set(&export->exp_refcount, 2);
855         cfs_atomic_set(&export->exp_rpc_count, 0);
856         cfs_atomic_set(&export->exp_cb_count, 0);
857         cfs_atomic_set(&export->exp_locks_count, 0);
858 #if LUSTRE_TRACKS_LOCK_EXP_REFS
859         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
860         cfs_spin_lock_init(&export->exp_locks_list_guard);
861 #endif
862         cfs_atomic_set(&export->exp_replay_count, 0);
863         export->exp_obd = obd;
864         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
865         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
866         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
867         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
868         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
869         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
870         class_handle_hash(&export->exp_handle, &export_handle_ops);
871         export->exp_last_request_time = cfs_time_current_sec();
872         cfs_spin_lock_init(&export->exp_lock);
873         cfs_spin_lock_init(&export->exp_rpc_lock);
874         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
875         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
876         cfs_spin_lock_init(&export->exp_bl_list_lock);
877         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
878
879         export->exp_sp_peer = LUSTRE_SP_ANY;
880         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881         export->exp_client_uuid = *cluuid;
882         obd_init_export(export);
883
884         cfs_spin_lock(&obd->obd_dev_lock);
885          /* shouldn't happen, but might race */
886         if (obd->obd_stopping)
887                 GOTO(exit_unlock, rc = -ENODEV);
888
889         hash = cfs_hash_getref(obd->obd_uuid_hash);
890         if (hash == NULL)
891                 GOTO(exit_unlock, rc = -ENODEV);
892         cfs_spin_unlock(&obd->obd_dev_lock);
893
894         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896                 if (rc != 0) {
897                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898                                       obd->obd_name, cluuid->uuid, rc);
899                         GOTO(exit_err, rc = -EALREADY);
900                 }
901         }
902
903         cfs_spin_lock(&obd->obd_dev_lock);
904         if (obd->obd_stopping) {
905                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
906                 GOTO(exit_unlock, rc = -ENODEV);
907         }
908
909         class_incref(obd, "export", export);
910         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
911         cfs_list_add_tail(&export->exp_obd_chain_timed,
912                           &export->exp_obd->obd_exports_timed);
913         export->exp_obd->obd_num_exports++;
914         cfs_spin_unlock(&obd->obd_dev_lock);
915         cfs_hash_putref(hash);
916         RETURN(export);
917
918 exit_unlock:
919         cfs_spin_unlock(&obd->obd_dev_lock);
920 exit_err:
921         if (hash)
922                 cfs_hash_putref(hash);
923         class_handle_unhash(&export->exp_handle);
924         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
925         obd_destroy_export(export);
926         OBD_FREE_PTR(export);
927         return ERR_PTR(rc);
928 }
929 EXPORT_SYMBOL(class_new_export);
930
931 void class_unlink_export(struct obd_export *exp)
932 {
933         class_handle_unhash(&exp->exp_handle);
934
935         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
936         /* delete an uuid-export hashitem from hashtables */
937         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
938                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
939                              &exp->exp_client_uuid,
940                              &exp->exp_uuid_hash);
941
942         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
943         cfs_list_del_init(&exp->exp_obd_chain_timed);
944         exp->exp_obd->obd_num_exports--;
945         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
946         class_export_put(exp);
947 }
948 EXPORT_SYMBOL(class_unlink_export);
949
950 /* Import management functions */
951 void class_import_destroy(struct obd_import *imp)
952 {
953         ENTRY;
954
955         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
956                 imp->imp_obd->obd_name);
957
958         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
959
960         ptlrpc_put_connection_superhack(imp->imp_connection);
961
962         while (!cfs_list_empty(&imp->imp_conn_list)) {
963                 struct obd_import_conn *imp_conn;
964
965                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
966                                           struct obd_import_conn, oic_item);
967                 cfs_list_del_init(&imp_conn->oic_item);
968                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
969                 OBD_FREE(imp_conn, sizeof(*imp_conn));
970         }
971
972         LASSERT(imp->imp_sec == NULL);
973         class_decref(imp->imp_obd, "import", imp);
974         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
975         EXIT;
976 }
977
978 static void import_handle_addref(void *import)
979 {
980         class_import_get(import);
981 }
982
983 static struct portals_handle_ops import_handle_ops = {
984         .hop_addref = import_handle_addref,
985         .hop_free   = NULL,
986 };
987
988 struct obd_import *class_import_get(struct obd_import *import)
989 {
990         cfs_atomic_inc(&import->imp_refcount);
991         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
992                cfs_atomic_read(&import->imp_refcount),
993                import->imp_obd->obd_name);
994         return import;
995 }
996 EXPORT_SYMBOL(class_import_get);
997
998 void class_import_put(struct obd_import *imp)
999 {
1000         ENTRY;
1001
1002         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1003         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1004
1005         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1006                cfs_atomic_read(&imp->imp_refcount) - 1,
1007                imp->imp_obd->obd_name);
1008
1009         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1010                 CDEBUG(D_INFO, "final put import %p\n", imp);
1011                 obd_zombie_import_add(imp);
1012         }
1013
1014         EXIT;
1015 }
1016 EXPORT_SYMBOL(class_import_put);
1017
1018 static void init_imp_at(struct imp_at *at) {
1019         int i;
1020         at_init(&at->iat_net_latency, 0, 0);
1021         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1022                 /* max service estimates are tracked on the server side, so
1023                    don't use the AT history here, just use the last reported
1024                    val. (But keep hist for proc histogram, worst_ever) */
1025                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1026                         AT_FLG_NOHIST);
1027         }
1028 }
1029
1030 struct obd_import *class_new_import(struct obd_device *obd)
1031 {
1032         struct obd_import *imp;
1033
1034         OBD_ALLOC(imp, sizeof(*imp));
1035         if (imp == NULL)
1036                 return NULL;
1037
1038         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1039         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1040         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1041         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1042         cfs_spin_lock_init(&imp->imp_lock);
1043         imp->imp_last_success_conn = 0;
1044         imp->imp_state = LUSTRE_IMP_NEW;
1045         imp->imp_obd = class_incref(obd, "import", imp);
1046         cfs_mutex_init(&imp->imp_sec_mutex);
1047         cfs_waitq_init(&imp->imp_recovery_waitq);
1048
1049         cfs_atomic_set(&imp->imp_refcount, 2);
1050         cfs_atomic_set(&imp->imp_unregistering, 0);
1051         cfs_atomic_set(&imp->imp_inflight, 0);
1052         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1053         cfs_atomic_set(&imp->imp_inval_count, 0);
1054         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1055         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1056         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1057         init_imp_at(&imp->imp_at);
1058
1059         /* the default magic is V2, will be used in connect RPC, and
1060          * then adjusted according to the flags in request/reply. */
1061         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1062
1063         return imp;
1064 }
1065 EXPORT_SYMBOL(class_new_import);
1066
1067 void class_destroy_import(struct obd_import *import)
1068 {
1069         LASSERT(import != NULL);
1070         LASSERT(import != LP_POISON);
1071
1072         class_handle_unhash(&import->imp_handle);
1073
1074         cfs_spin_lock(&import->imp_lock);
1075         import->imp_generation++;
1076         cfs_spin_unlock(&import->imp_lock);
1077         class_import_put(import);
1078 }
1079 EXPORT_SYMBOL(class_destroy_import);
1080
1081 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1082
1083 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1084 {
1085         cfs_spin_lock(&exp->exp_locks_list_guard);
1086
1087         LASSERT(lock->l_exp_refs_nr >= 0);
1088
1089         if (lock->l_exp_refs_target != NULL &&
1090             lock->l_exp_refs_target != exp) {
1091                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1092                               exp, lock, lock->l_exp_refs_target);
1093         }
1094         if ((lock->l_exp_refs_nr ++) == 0) {
1095                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1096                 lock->l_exp_refs_target = exp;
1097         }
1098         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1099                lock, exp, lock->l_exp_refs_nr);
1100         cfs_spin_unlock(&exp->exp_locks_list_guard);
1101 }
1102 EXPORT_SYMBOL(__class_export_add_lock_ref);
1103
1104 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1105 {
1106         cfs_spin_lock(&exp->exp_locks_list_guard);
1107         LASSERT(lock->l_exp_refs_nr > 0);
1108         if (lock->l_exp_refs_target != exp) {
1109                 LCONSOLE_WARN("lock %p, "
1110                               "mismatching export pointers: %p, %p\n",
1111                               lock, lock->l_exp_refs_target, exp);
1112         }
1113         if (-- lock->l_exp_refs_nr == 0) {
1114                 cfs_list_del_init(&lock->l_exp_refs_link);
1115                 lock->l_exp_refs_target = NULL;
1116         }
1117         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1118                lock, exp, lock->l_exp_refs_nr);
1119         cfs_spin_unlock(&exp->exp_locks_list_guard);
1120 }
1121 EXPORT_SYMBOL(__class_export_del_lock_ref);
1122 #endif
1123
1124 /* A connection defines an export context in which preallocation can
1125    be managed. This releases the export pointer reference, and returns
1126    the export handle, so the export refcount is 1 when this function
1127    returns. */
1128 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1129                   struct obd_uuid *cluuid)
1130 {
1131         struct obd_export *export;
1132         LASSERT(conn != NULL);
1133         LASSERT(obd != NULL);
1134         LASSERT(cluuid != NULL);
1135         ENTRY;
1136
1137         export = class_new_export(obd, cluuid);
1138         if (IS_ERR(export))
1139                 RETURN(PTR_ERR(export));
1140
1141         conn->cookie = export->exp_handle.h_cookie;
1142         class_export_put(export);
1143
1144         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1145                cluuid->uuid, conn->cookie);
1146         RETURN(0);
1147 }
1148 EXPORT_SYMBOL(class_connect);
1149
1150
1151 /* This function removes 1-3 references from the export:
1152  * 1 - for export pointer passed
1153  * and if disconnect really need
1154  * 2 - removing from hash
1155  * 3 - in client_unlink_export
1156  * The export pointer passed to this function can destroyed */
1157 int class_disconnect(struct obd_export *export)
1158 {
1159         int already_disconnected;
1160         ENTRY;
1161
1162         if (export == NULL) {
1163                 CWARN("attempting to free NULL export %p\n", export);
1164                 RETURN(-EINVAL);
1165         }
1166
1167         cfs_spin_lock(&export->exp_lock);
1168         already_disconnected = export->exp_disconnected;
1169         export->exp_disconnected = 1;
1170         cfs_spin_unlock(&export->exp_lock);
1171
1172         /* class_cleanup(), abort_recovery(), and class_fail_export()
1173          * all end up in here, and if any of them race we shouldn't
1174          * call extra class_export_puts(). */
1175         if (already_disconnected) {
1176                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1177                 GOTO(no_disconn, already_disconnected);
1178         }
1179
1180         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1181                export->exp_handle.h_cookie);
1182
1183         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1184                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1185                              &export->exp_connection->c_peer.nid,
1186                              &export->exp_nid_hash);
1187
1188         class_unlink_export(export);
1189 no_disconn:
1190         class_export_put(export);
1191         RETURN(0);
1192 }
1193 EXPORT_SYMBOL(class_disconnect);
1194
1195 /* Return non-zero for a fully connected export */
1196 int class_connected_export(struct obd_export *exp)
1197 {
1198         if (exp) {
1199                 int connected;
1200                 cfs_spin_lock(&exp->exp_lock);
1201                 connected = (exp->exp_conn_cnt > 0);
1202                 cfs_spin_unlock(&exp->exp_lock);
1203                 return connected;
1204         }
1205         return 0;
1206 }
1207 EXPORT_SYMBOL(class_connected_export);
1208
1209 static void class_disconnect_export_list(cfs_list_t *list,
1210                                          enum obd_option flags)
1211 {
1212         int rc;
1213         struct obd_export *exp;
1214         ENTRY;
1215
1216         /* It's possible that an export may disconnect itself, but
1217          * nothing else will be added to this list. */
1218         while (!cfs_list_empty(list)) {
1219                 exp = cfs_list_entry(list->next, struct obd_export,
1220                                      exp_obd_chain);
1221                 /* need for safe call CDEBUG after obd_disconnect */
1222                 class_export_get(exp);
1223
1224                 cfs_spin_lock(&exp->exp_lock);
1225                 exp->exp_flags = flags;
1226                 cfs_spin_unlock(&exp->exp_lock);
1227
1228                 if (obd_uuid_equals(&exp->exp_client_uuid,
1229                                     &exp->exp_obd->obd_uuid)) {
1230                         CDEBUG(D_HA,
1231                                "exp %p export uuid == obd uuid, don't discon\n",
1232                                exp);
1233                         /* Need to delete this now so we don't end up pointing
1234                          * to work_list later when this export is cleaned up. */
1235                         cfs_list_del_init(&exp->exp_obd_chain);
1236                         class_export_put(exp);
1237                         continue;
1238                 }
1239
1240                 class_export_get(exp);
1241                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1242                        "last request at "CFS_TIME_T"\n",
1243                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1244                        exp, exp->exp_last_request_time);
1245                 /* release one export reference anyway */
1246                 rc = obd_disconnect(exp);
1247
1248                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1249                        obd_export_nid2str(exp), exp, rc);
1250                 class_export_put(exp);
1251         }
1252         EXIT;
1253 }
1254
1255 void class_disconnect_exports(struct obd_device *obd)
1256 {
1257         cfs_list_t work_list;
1258         ENTRY;
1259
1260         /* Move all of the exports from obd_exports to a work list, en masse. */
1261         CFS_INIT_LIST_HEAD(&work_list);
1262         cfs_spin_lock(&obd->obd_dev_lock);
1263         cfs_list_splice_init(&obd->obd_exports, &work_list);
1264         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1265         cfs_spin_unlock(&obd->obd_dev_lock);
1266
1267         if (!cfs_list_empty(&work_list)) {
1268                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1269                        "disconnecting them\n", obd->obd_minor, obd);
1270                 class_disconnect_export_list(&work_list,
1271                                              exp_flags_from_obd(obd));
1272         } else
1273                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1274                        obd->obd_minor, obd);
1275         EXIT;
1276 }
1277 EXPORT_SYMBOL(class_disconnect_exports);
1278
1279 /* Remove exports that have not completed recovery.
1280  */
1281 void class_disconnect_stale_exports(struct obd_device *obd,
1282                                     int (*test_export)(struct obd_export *))
1283 {
1284         cfs_list_t work_list;
1285         cfs_list_t *pos, *n;
1286         struct obd_export *exp;
1287         int evicted = 0;
1288         ENTRY;
1289
1290         CFS_INIT_LIST_HEAD(&work_list);
1291         cfs_spin_lock(&obd->obd_dev_lock);
1292         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1293                 int failed;
1294
1295                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1296
1297                 /* don't count self-export as client */
1298                 if (obd_uuid_equals(&exp->exp_client_uuid,
1299                                     &exp->exp_obd->obd_uuid))
1300                         continue;
1301
1302                 if (test_export(exp))
1303                         continue;
1304
1305                 cfs_spin_lock(&exp->exp_lock);
1306                 failed = exp->exp_failed;
1307                 exp->exp_failed = 1;
1308                 cfs_spin_unlock(&exp->exp_lock);
1309                 if (failed)
1310                         continue;
1311
1312                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1313                 evicted++;
1314                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1315                        obd->obd_name, exp->exp_client_uuid.uuid,
1316                        exp->exp_connection == NULL ? "<unknown>" :
1317                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1318                 print_export_data(exp, "EVICTING", 0);
1319         }
1320         cfs_spin_unlock(&obd->obd_dev_lock);
1321
1322         if (evicted) {
1323                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1324                               obd->obd_name, evicted);
1325                 obd->obd_stale_clients += evicted;
1326         }
1327         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1328                                                  OBD_OPT_ABORT_RECOV);
1329         EXIT;
1330 }
1331 EXPORT_SYMBOL(class_disconnect_stale_exports);
1332
1333 void class_fail_export(struct obd_export *exp)
1334 {
1335         int rc, already_failed;
1336
1337         cfs_spin_lock(&exp->exp_lock);
1338         already_failed = exp->exp_failed;
1339         exp->exp_failed = 1;
1340         cfs_spin_unlock(&exp->exp_lock);
1341
1342         if (already_failed) {
1343                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1344                        exp, exp->exp_client_uuid.uuid);
1345                 return;
1346         }
1347
1348         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1349                exp, exp->exp_client_uuid.uuid);
1350
1351         if (obd_dump_on_timeout)
1352                 libcfs_debug_dumplog();
1353
1354         /* need for safe call CDEBUG after obd_disconnect */
1355         class_export_get(exp);
1356
1357         /* Most callers into obd_disconnect are removing their own reference
1358          * (request, for example) in addition to the one from the hash table.
1359          * We don't have such a reference here, so make one. */
1360         class_export_get(exp);
1361         rc = obd_disconnect(exp);
1362         if (rc)
1363                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1364         else
1365                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1366                        exp, exp->exp_client_uuid.uuid);
1367         class_export_put(exp);
1368 }
1369 EXPORT_SYMBOL(class_fail_export);
1370
1371 char *obd_export_nid2str(struct obd_export *exp)
1372 {
1373         if (exp->exp_connection != NULL)
1374                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1375
1376         return "(no nid)";
1377 }
1378 EXPORT_SYMBOL(obd_export_nid2str);
1379
1380 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1381 {
1382         struct obd_export *doomed_exp = NULL;
1383         int exports_evicted = 0;
1384
1385         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1386
1387         do {
1388                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1389                 if (doomed_exp == NULL)
1390                         break;
1391
1392                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1393                          "nid %s found, wanted nid %s, requested nid %s\n",
1394                          obd_export_nid2str(doomed_exp),
1395                          libcfs_nid2str(nid_key), nid);
1396                 LASSERTF(doomed_exp != obd->obd_self_export,
1397                          "self-export is hashed by NID?\n");
1398                 exports_evicted++;
1399                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1400                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1401                        exports_evicted);
1402                 class_fail_export(doomed_exp);
1403                 class_export_put(doomed_exp);
1404         } while (1);
1405
1406         if (!exports_evicted)
1407                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1408                        obd->obd_name, nid);
1409         return exports_evicted;
1410 }
1411 EXPORT_SYMBOL(obd_export_evict_by_nid);
1412
1413 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1414 {
1415         struct obd_export *doomed_exp = NULL;
1416         struct obd_uuid doomed_uuid;
1417         int exports_evicted = 0;
1418
1419         obd_str2uuid(&doomed_uuid, uuid);
1420         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1421                 CERROR("%s: can't evict myself\n", obd->obd_name);
1422                 return exports_evicted;
1423         }
1424
1425         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1426
1427         if (doomed_exp == NULL) {
1428                 CERROR("%s: can't disconnect %s: no exports found\n",
1429                        obd->obd_name, uuid);
1430         } else {
1431                 CWARN("%s: evicting %s at adminstrative request\n",
1432                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1433                 class_fail_export(doomed_exp);
1434                 class_export_put(doomed_exp);
1435                 exports_evicted++;
1436         }
1437
1438         return exports_evicted;
1439 }
1440 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1441
1442 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1443 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1444 EXPORT_SYMBOL(class_export_dump_hook);
1445 #endif
1446
1447 static void print_export_data(struct obd_export *exp, const char *status,
1448                               int locks)
1449 {
1450         struct ptlrpc_reply_state *rs;
1451         struct ptlrpc_reply_state *first_reply = NULL;
1452         int nreplies = 0;
1453
1454         cfs_spin_lock(&exp->exp_lock);
1455         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1456                                 rs_exp_list) {
1457                 if (nreplies == 0)
1458                         first_reply = rs;
1459                 nreplies++;
1460         }
1461         cfs_spin_unlock(&exp->exp_lock);
1462
1463         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1464                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1465                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1466                cfs_atomic_read(&exp->exp_rpc_count),
1467                cfs_atomic_read(&exp->exp_cb_count),
1468                cfs_atomic_read(&exp->exp_locks_count),
1469                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1470                nreplies, first_reply, nreplies > 3 ? "..." : "",
1471                exp->exp_last_committed);
1472 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1473         if (locks && class_export_dump_hook != NULL)
1474                 class_export_dump_hook(exp);
1475 #endif
1476 }
1477
1478 void dump_exports(struct obd_device *obd, int locks)
1479 {
1480         struct obd_export *exp;
1481
1482         cfs_spin_lock(&obd->obd_dev_lock);
1483         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1484                 print_export_data(exp, "ACTIVE", locks);
1485         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1486                 print_export_data(exp, "UNLINKED", locks);
1487         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1488                 print_export_data(exp, "DELAYED", locks);
1489         cfs_spin_unlock(&obd->obd_dev_lock);
1490         cfs_spin_lock(&obd_zombie_impexp_lock);
1491         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1492                 print_export_data(exp, "ZOMBIE", locks);
1493         cfs_spin_unlock(&obd_zombie_impexp_lock);
1494 }
1495 EXPORT_SYMBOL(dump_exports);
1496
1497 void obd_exports_barrier(struct obd_device *obd)
1498 {
1499         int waited = 2;
1500         LASSERT(cfs_list_empty(&obd->obd_exports));
1501         cfs_spin_lock(&obd->obd_dev_lock);
1502         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1503                 cfs_spin_unlock(&obd->obd_dev_lock);
1504                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1505                                                    cfs_time_seconds(waited));
1506                 if (waited > 5 && IS_PO2(waited)) {
1507                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1508                                       "more than %d seconds. "
1509                                       "The obd refcount = %d. Is it stuck?\n",
1510                                       obd->obd_name, waited,
1511                                       cfs_atomic_read(&obd->obd_refcount));
1512                         dump_exports(obd, 1);
1513                 }
1514                 waited *= 2;
1515                 cfs_spin_lock(&obd->obd_dev_lock);
1516         }
1517         cfs_spin_unlock(&obd->obd_dev_lock);
1518 }
1519 EXPORT_SYMBOL(obd_exports_barrier);
1520
1521 /* Total amount of zombies to be destroyed */
1522 static int zombies_count = 0;
1523
1524 /**
1525  * kill zombie imports and exports
1526  */
1527 void obd_zombie_impexp_cull(void)
1528 {
1529         struct obd_import *import;
1530         struct obd_export *export;
1531         ENTRY;
1532
1533         do {
1534                 cfs_spin_lock(&obd_zombie_impexp_lock);
1535
1536                 import = NULL;
1537                 if (!cfs_list_empty(&obd_zombie_imports)) {
1538                         import = cfs_list_entry(obd_zombie_imports.next,
1539                                                 struct obd_import,
1540                                                 imp_zombie_chain);
1541                         cfs_list_del_init(&import->imp_zombie_chain);
1542                 }
1543
1544                 export = NULL;
1545                 if (!cfs_list_empty(&obd_zombie_exports)) {
1546                         export = cfs_list_entry(obd_zombie_exports.next,
1547                                                 struct obd_export,
1548                                                 exp_obd_chain);
1549                         cfs_list_del_init(&export->exp_obd_chain);
1550                 }
1551
1552                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1553
1554                 if (import != NULL) {
1555                         class_import_destroy(import);
1556                         cfs_spin_lock(&obd_zombie_impexp_lock);
1557                         zombies_count--;
1558                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1559                 }
1560
1561                 if (export != NULL) {
1562                         class_export_destroy(export);
1563                         cfs_spin_lock(&obd_zombie_impexp_lock);
1564                         zombies_count--;
1565                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1566                 }
1567
1568                 cfs_cond_resched();
1569         } while (import != NULL || export != NULL);
1570         EXIT;
1571 }
1572
1573 static cfs_completion_t         obd_zombie_start;
1574 static cfs_completion_t         obd_zombie_stop;
1575 static unsigned long            obd_zombie_flags;
1576 static cfs_waitq_t              obd_zombie_waitq;
1577 static pid_t                    obd_zombie_pid;
1578
1579 enum {
1580         OBD_ZOMBIE_STOP   = 1 << 1
1581 };
1582
1583 /**
1584  * check for work for kill zombie import/export thread.
1585  */
1586 static int obd_zombie_impexp_check(void *arg)
1587 {
1588         int rc;
1589
1590         cfs_spin_lock(&obd_zombie_impexp_lock);
1591         rc = (zombies_count == 0) &&
1592              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1593         cfs_spin_unlock(&obd_zombie_impexp_lock);
1594
1595         RETURN(rc);
1596 }
1597
1598 /**
1599  * Add export to the obd_zombe thread and notify it.
1600  */
1601 static void obd_zombie_export_add(struct obd_export *exp) {
1602         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1603         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1604         cfs_list_del_init(&exp->exp_obd_chain);
1605         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1606         cfs_spin_lock(&obd_zombie_impexp_lock);
1607         zombies_count++;
1608         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1609         cfs_spin_unlock(&obd_zombie_impexp_lock);
1610
1611         obd_zombie_impexp_notify();
1612 }
1613
1614 /**
1615  * Add import to the obd_zombe thread and notify it.
1616  */
1617 static void obd_zombie_import_add(struct obd_import *imp) {
1618         LASSERT(imp->imp_sec == NULL);
1619         LASSERT(imp->imp_rq_pool == NULL);
1620         cfs_spin_lock(&obd_zombie_impexp_lock);
1621         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1622         zombies_count++;
1623         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1624         cfs_spin_unlock(&obd_zombie_impexp_lock);
1625
1626         obd_zombie_impexp_notify();
1627 }
1628
1629 /**
1630  * notify import/export destroy thread about new zombie.
1631  */
1632 static void obd_zombie_impexp_notify(void)
1633 {
1634         /*
1635          * Make sure obd_zomebie_impexp_thread get this notification.
1636          * It is possible this signal only get by obd_zombie_barrier, and
1637          * barrier gulps this notification and sleeps away and hangs ensues
1638          */
1639         cfs_waitq_broadcast(&obd_zombie_waitq);
1640 }
1641
1642 /**
1643  * check whether obd_zombie is idle
1644  */
1645 static int obd_zombie_is_idle(void)
1646 {
1647         int rc;
1648
1649         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1650         cfs_spin_lock(&obd_zombie_impexp_lock);
1651         rc = (zombies_count == 0);
1652         cfs_spin_unlock(&obd_zombie_impexp_lock);
1653         return rc;
1654 }
1655
1656 /**
1657  * wait when obd_zombie import/export queues become empty
1658  */
1659 void obd_zombie_barrier(void)
1660 {
1661         struct l_wait_info lwi = { 0 };
1662
1663         if (obd_zombie_pid == cfs_curproc_pid())
1664                 /* don't wait for myself */
1665                 return;
1666         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1667 }
1668 EXPORT_SYMBOL(obd_zombie_barrier);
1669
1670 #ifdef __KERNEL__
1671
1672 /**
1673  * destroy zombie export/import thread.
1674  */
1675 static int obd_zombie_impexp_thread(void *unused)
1676 {
1677         int rc;
1678
1679         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1680                 cfs_complete(&obd_zombie_start);
1681                 RETURN(rc);
1682         }
1683
1684         cfs_complete(&obd_zombie_start);
1685
1686         obd_zombie_pid = cfs_curproc_pid();
1687
1688         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1689                 struct l_wait_info lwi = { 0 };
1690
1691                 l_wait_event(obd_zombie_waitq,
1692                              !obd_zombie_impexp_check(NULL), &lwi);
1693                 obd_zombie_impexp_cull();
1694
1695                 /*
1696                  * Notify obd_zombie_barrier callers that queues
1697                  * may be empty.
1698                  */
1699                 cfs_waitq_signal(&obd_zombie_waitq);
1700         }
1701
1702         cfs_complete(&obd_zombie_stop);
1703
1704         RETURN(0);
1705 }
1706
1707 #else /* ! KERNEL */
1708
1709 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1710 static void *obd_zombie_impexp_work_cb;
1711 static void *obd_zombie_impexp_idle_cb;
1712
1713 int obd_zombie_impexp_kill(void *arg)
1714 {
1715         int rc = 0;
1716
1717         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1718                 obd_zombie_impexp_cull();
1719                 rc = 1;
1720         }
1721         cfs_atomic_dec(&zombie_recur);
1722         return rc;
1723 }
1724
1725 #endif
1726
1727 /**
1728  * start destroy zombie import/export thread
1729  */
1730 int obd_zombie_impexp_init(void)
1731 {
1732         int rc;
1733
1734         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1735         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1736         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1737         cfs_init_completion(&obd_zombie_start);
1738         cfs_init_completion(&obd_zombie_stop);
1739         cfs_waitq_init(&obd_zombie_waitq);
1740         obd_zombie_pid = 0;
1741
1742 #ifdef __KERNEL__
1743         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1744         if (rc < 0)
1745                 RETURN(rc);
1746
1747         cfs_wait_for_completion(&obd_zombie_start);
1748 #else
1749
1750         obd_zombie_impexp_work_cb =
1751                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1752                                                  &obd_zombie_impexp_kill, NULL);
1753
1754         obd_zombie_impexp_idle_cb =
1755                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1756                                                  &obd_zombie_impexp_check, NULL);
1757         rc = 0;
1758 #endif
1759         RETURN(rc);
1760 }
1761 /**
1762  * stop destroy zombie import/export thread
1763  */
1764 void obd_zombie_impexp_stop(void)
1765 {
1766         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1767         obd_zombie_impexp_notify();
1768 #ifdef __KERNEL__
1769         cfs_wait_for_completion(&obd_zombie_stop);
1770 #else
1771         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1772         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1773 #endif
1774 }
1775
1776 /***** Kernel-userspace comm helpers *******/
1777
1778 /* Get length of entire message, including header */
1779 int kuc_len(int payload_len)
1780 {
1781         return sizeof(struct kuc_hdr) + payload_len;
1782 }
1783 EXPORT_SYMBOL(kuc_len);
1784
1785 /* Get a pointer to kuc header, given a ptr to the payload
1786  * @param p Pointer to payload area
1787  * @returns Pointer to kuc header
1788  */
1789 struct kuc_hdr * kuc_ptr(void *p)
1790 {
1791         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1792         LASSERT(lh->kuc_magic == KUC_MAGIC);
1793         return lh;
1794 }
1795 EXPORT_SYMBOL(kuc_ptr);
1796
1797 /* Test if payload is part of kuc message
1798  * @param p Pointer to payload area
1799  * @returns boolean
1800  */
1801 int kuc_ispayload(void *p)
1802 {
1803         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1804
1805         if (kh->kuc_magic == KUC_MAGIC)
1806                 return 1;
1807         else
1808                 return 0;
1809 }
1810 EXPORT_SYMBOL(kuc_ispayload);
1811
1812 /* Alloc space for a message, and fill in header
1813  * @return Pointer to payload area
1814  */
1815 void *kuc_alloc(int payload_len, int transport, int type)
1816 {
1817         struct kuc_hdr *lh;
1818         int len = kuc_len(payload_len);
1819
1820         OBD_ALLOC(lh, len);
1821         if (lh == NULL)
1822                 return ERR_PTR(-ENOMEM);
1823
1824         lh->kuc_magic = KUC_MAGIC;
1825         lh->kuc_transport = transport;
1826         lh->kuc_msgtype = type;
1827         lh->kuc_msglen = len;
1828
1829         return (void *)(lh + 1);
1830 }
1831 EXPORT_SYMBOL(kuc_alloc);
1832
1833 /* Takes pointer to payload area */
1834 inline void kuc_free(void *p, int payload_len)
1835 {
1836         struct kuc_hdr *lh = kuc_ptr(p);
1837         OBD_FREE(lh, kuc_len(payload_len));
1838 }
1839 EXPORT_SYMBOL(kuc_free);
1840
1841
1842