Whamcloud - gitweb
57e612a57b2b29c7a50fd3add1e7253da4e4912d
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123                 if (!cfs_request_module("%s", modname)) {
124                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125                         type = class_search_type(name);
126                 } else {
127                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
128                                            modname);
129                 }
130         }
131 #endif
132         if (type) {
133                 cfs_spin_lock(&type->obd_type_lock);
134                 type->typ_refcnt++;
135                 cfs_try_module_get(type->typ_dt_ops->o_owner);
136                 cfs_spin_unlock(&type->obd_type_lock);
137         }
138         return type;
139 }
140 EXPORT_SYMBOL(class_get_type);
141
142 void class_put_type(struct obd_type *type)
143 {
144         LASSERT(type);
145         cfs_spin_lock(&type->obd_type_lock);
146         type->typ_refcnt--;
147         cfs_module_put(type->typ_dt_ops->o_owner);
148         cfs_spin_unlock(&type->obd_type_lock);
149 }
150 EXPORT_SYMBOL(class_put_type);
151
152 #define CLASS_MAX_NAME 1024
153
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155                         struct lprocfs_vars *vars, const char *name,
156                         struct lu_device_type *ldt)
157 {
158         struct obd_type *type;
159         int rc = 0;
160         ENTRY;
161
162         /* sanity check */
163         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
164
165         if (class_search_type(name)) {
166                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
167                 RETURN(-EEXIST);
168         }
169
170         rc = -ENOMEM;
171         OBD_ALLOC(type, sizeof(*type));
172         if (type == NULL)
173                 RETURN(rc);
174
175         OBD_ALLOC_PTR(type->typ_dt_ops);
176         OBD_ALLOC_PTR(type->typ_md_ops);
177         OBD_ALLOC(type->typ_name, strlen(name) + 1);
178
179         if (type->typ_dt_ops == NULL ||
180             type->typ_md_ops == NULL ||
181             type->typ_name == NULL)
182                 GOTO (failed, rc);
183
184         *(type->typ_dt_ops) = *dt_ops;
185         /* md_ops is optional */
186         if (md_ops)
187                 *(type->typ_md_ops) = *md_ops;
188         strcpy(type->typ_name, name);
189         cfs_spin_lock_init(&type->obd_type_lock);
190
191 #ifdef LPROCFS
192         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
193                                               vars, type);
194         if (IS_ERR(type->typ_procroot)) {
195                 rc = PTR_ERR(type->typ_procroot);
196                 type->typ_procroot = NULL;
197                 GOTO (failed, rc);
198         }
199 #endif
200         if (ldt != NULL) {
201                 type->typ_lu = ldt;
202                 rc = lu_device_type_init(ldt);
203                 if (rc != 0)
204                         GOTO (failed, rc);
205         }
206
207         cfs_spin_lock(&obd_types_lock);
208         cfs_list_add(&type->typ_chain, &obd_types);
209         cfs_spin_unlock(&obd_types_lock);
210
211         RETURN (0);
212
213  failed:
214         if (type->typ_name != NULL)
215                 OBD_FREE(type->typ_name, strlen(name) + 1);
216         if (type->typ_md_ops != NULL)
217                 OBD_FREE_PTR(type->typ_md_ops);
218         if (type->typ_dt_ops != NULL)
219                 OBD_FREE_PTR(type->typ_dt_ops);
220         OBD_FREE(type, sizeof(*type));
221         RETURN(rc);
222 }
223 EXPORT_SYMBOL(class_register_type);
224
225 int class_unregister_type(const char *name)
226 {
227         struct obd_type *type = class_search_type(name);
228         ENTRY;
229
230         if (!type) {
231                 CERROR("unknown obd type\n");
232                 RETURN(-EINVAL);
233         }
234
235         if (type->typ_refcnt) {
236                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
237                 /* This is a bad situation, let's make the best of it */
238                 /* Remove ops, but leave the name for debugging */
239                 OBD_FREE_PTR(type->typ_dt_ops);
240                 OBD_FREE_PTR(type->typ_md_ops);
241                 RETURN(-EBUSY);
242         }
243
244         if (type->typ_procroot) {
245                 lprocfs_remove(&type->typ_procroot);
246         }
247
248         if (type->typ_lu)
249                 lu_device_type_fini(type->typ_lu);
250
251         cfs_spin_lock(&obd_types_lock);
252         cfs_list_del(&type->typ_chain);
253         cfs_spin_unlock(&obd_types_lock);
254         OBD_FREE(type->typ_name, strlen(name) + 1);
255         if (type->typ_dt_ops != NULL)
256                 OBD_FREE_PTR(type->typ_dt_ops);
257         if (type->typ_md_ops != NULL)
258                 OBD_FREE_PTR(type->typ_md_ops);
259         OBD_FREE(type, sizeof(*type));
260         RETURN(0);
261 } /* class_unregister_type */
262 EXPORT_SYMBOL(class_unregister_type);
263
264 /**
265  * Create a new obd device.
266  *
267  * Find an empty slot in ::obd_devs[], create a new obd device in it.
268  *
269  * \param[in] type_name obd device type string.
270  * \param[in] name      obd device name.
271  *
272  * \retval NULL if create fails, otherwise return the obd device
273  *         pointer created.
274  */
275 struct obd_device *class_newdev(const char *type_name, const char *name)
276 {
277         struct obd_device *result = NULL;
278         struct obd_device *newdev;
279         struct obd_type *type = NULL;
280         int i;
281         int new_obd_minor = 0;
282         ENTRY;
283
284         if (strlen(name) >= MAX_OBD_NAME) {
285                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286                 RETURN(ERR_PTR(-EINVAL));
287         }
288
289         type = class_get_type(type_name);
290         if (type == NULL){
291                 CERROR("OBD: unknown type: %s\n", type_name);
292                 RETURN(ERR_PTR(-ENODEV));
293         }
294
295         newdev = obd_device_alloc();
296         if (newdev == NULL) {
297                 class_put_type(type);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
301
302         cfs_write_lock(&obd_dev_lock);
303         for (i = 0; i < class_devno_max(); i++) {
304                 struct obd_device *obd = class_num2obd(i);
305
306                 if (obd && obd->obd_name &&
307                     (strcmp(name, obd->obd_name) == 0)) {
308                         CERROR("Device %s already exists at %d, won't add\n",
309                                name, i);
310                         if (result) {
311                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312                                          "%p obd_magic %08x != %08x\n", result,
313                                          result->obd_magic, OBD_DEVICE_MAGIC);
314                                 LASSERTF(result->obd_minor == new_obd_minor,
315                                          "%p obd_minor %d != %d\n", result,
316                                          result->obd_minor, new_obd_minor);
317
318                                 obd_devs[result->obd_minor] = NULL;
319                                 result->obd_name[0]='\0';
320                          }
321                         result = ERR_PTR(-EEXIST);
322                         break;
323                 }
324                 if (!result && !obd) {
325                         result = newdev;
326                         result->obd_minor = i;
327                         new_obd_minor = i;
328                         result->obd_type = type;
329                         strncpy(result->obd_name, name,
330                                 sizeof(result->obd_name) - 1);
331                         obd_devs[i] = result;
332                 }
333         }
334         cfs_write_unlock(&obd_dev_lock);
335
336         if (result == NULL && i >= class_devno_max()) {
337                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338                        class_devno_max());
339                 RETURN(ERR_PTR(-EOVERFLOW));
340         }
341
342         if (IS_ERR(result)) {
343                 obd_device_free(newdev);
344                 class_put_type(type);
345         } else {
346                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347                        result->obd_name, result);
348         }
349         RETURN(result);
350 }
351
352 void class_release_dev(struct obd_device *obd)
353 {
354         struct obd_type *obd_type = obd->obd_type;
355
356         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360         LASSERT(obd_type != NULL);
361
362         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
363                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
364
365         cfs_write_lock(&obd_dev_lock);
366         obd_devs[obd->obd_minor] = NULL;
367         cfs_write_unlock(&obd_dev_lock);
368         obd_device_free(obd);
369
370         class_put_type(obd_type);
371 }
372
373 int class_name2dev(const char *name)
374 {
375         int i;
376
377         if (!name)
378                 return -1;
379
380         cfs_read_lock(&obd_dev_lock);
381         for (i = 0; i < class_devno_max(); i++) {
382                 struct obd_device *obd = class_num2obd(i);
383
384                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385                         /* Make sure we finished attaching before we give
386                            out any references */
387                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388                         if (obd->obd_attached) {
389                                 cfs_read_unlock(&obd_dev_lock);
390                                 return i;
391                         }
392                         break;
393                 }
394         }
395         cfs_read_unlock(&obd_dev_lock);
396
397         return -1;
398 }
399 EXPORT_SYMBOL(class_name2dev);
400
401 struct obd_device *class_name2obd(const char *name)
402 {
403         int dev = class_name2dev(name);
404
405         if (dev < 0 || dev > class_devno_max())
406                 return NULL;
407         return class_num2obd(dev);
408 }
409 EXPORT_SYMBOL(class_name2obd);
410
411 int class_uuid2dev(struct obd_uuid *uuid)
412 {
413         int i;
414
415         cfs_read_lock(&obd_dev_lock);
416         for (i = 0; i < class_devno_max(); i++) {
417                 struct obd_device *obd = class_num2obd(i);
418
419                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421                         cfs_read_unlock(&obd_dev_lock);
422                         return i;
423                 }
424         }
425         cfs_read_unlock(&obd_dev_lock);
426
427         return -1;
428 }
429 EXPORT_SYMBOL(class_uuid2dev);
430
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
432 {
433         int dev = class_uuid2dev(uuid);
434         if (dev < 0)
435                 return NULL;
436         return class_num2obd(dev);
437 }
438 EXPORT_SYMBOL(class_uuid2obd);
439
440 /**
441  * Get obd device from ::obd_devs[]
442  *
443  * \param num [in] array index
444  *
445  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446  *         otherwise return the obd device there.
447  */
448 struct obd_device *class_num2obd(int num)
449 {
450         struct obd_device *obd = NULL;
451
452         if (num < class_devno_max()) {
453                 obd = obd_devs[num];
454                 if (obd == NULL)
455                         return NULL;
456
457                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458                          "%p obd_magic %08x != %08x\n",
459                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460                 LASSERTF(obd->obd_minor == num,
461                          "%p obd_minor %0d != %0d\n",
462                          obd, obd->obd_minor, num);
463         }
464
465         return obd;
466 }
467 EXPORT_SYMBOL(class_num2obd);
468
469 void class_obd_list(void)
470 {
471         char *status;
472         int i;
473
474         cfs_read_lock(&obd_dev_lock);
475         for (i = 0; i < class_devno_max(); i++) {
476                 struct obd_device *obd = class_num2obd(i);
477
478                 if (obd == NULL)
479                         continue;
480                 if (obd->obd_stopping)
481                         status = "ST";
482                 else if (obd->obd_set_up)
483                         status = "UP";
484                 else if (obd->obd_attached)
485                         status = "AT";
486                 else
487                         status = "--";
488                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489                          i, status, obd->obd_type->typ_name,
490                          obd->obd_name, obd->obd_uuid.uuid,
491                          cfs_atomic_read(&obd->obd_refcount));
492         }
493         cfs_read_unlock(&obd_dev_lock);
494         return;
495 }
496
497 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
498    specified, then only the client with that uuid is returned,
499    otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501                                           const char * typ_name,
502                                           struct obd_uuid *grp_uuid)
503 {
504         int i;
505
506         cfs_read_lock(&obd_dev_lock);
507         for (i = 0; i < class_devno_max(); i++) {
508                 struct obd_device *obd = class_num2obd(i);
509
510                 if (obd == NULL)
511                         continue;
512                 if ((strncmp(obd->obd_type->typ_name, typ_name,
513                              strlen(typ_name)) == 0)) {
514                         if (obd_uuid_equals(tgt_uuid,
515                                             &obd->u.cli.cl_target_uuid) &&
516                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
517                                                          &obd->obd_uuid) : 1)) {
518                                 cfs_read_unlock(&obd_dev_lock);
519                                 return obd;
520                         }
521                 }
522         }
523         cfs_read_unlock(&obd_dev_lock);
524
525         return NULL;
526 }
527 EXPORT_SYMBOL(class_find_client_obd);
528
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530    searching at *next, and if a device is found, the next index to look
531    at is saved in *next. If next is NULL, then the first matching device
532    will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
534 {
535         int i;
536
537         if (next == NULL)
538                 i = 0;
539         else if (*next >= 0 && *next < class_devno_max())
540                 i = *next;
541         else
542                 return NULL;
543
544         cfs_read_lock(&obd_dev_lock);
545         for (; i < class_devno_max(); i++) {
546                 struct obd_device *obd = class_num2obd(i);
547
548                 if (obd == NULL)
549                         continue;
550                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
551                         if (next != NULL)
552                                 *next = i+1;
553                         cfs_read_unlock(&obd_dev_lock);
554                         return obd;
555                 }
556         }
557         cfs_read_unlock(&obd_dev_lock);
558
559         return NULL;
560 }
561 EXPORT_SYMBOL(class_devices_in_group);
562
563 /**
564  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565  * adjust sptlrpc settings accordingly.
566  */
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
568 {
569         struct obd_device  *obd;
570         const char         *type;
571         int                 i, rc = 0, rc2;
572
573         LASSERT(namelen > 0);
574
575         cfs_read_lock(&obd_dev_lock);
576         for (i = 0; i < class_devno_max(); i++) {
577                 obd = class_num2obd(i);
578
579                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
580                         continue;
581
582                 /* only notify mdc, osc, mdt, ost */
583                 type = obd->obd_type->typ_name;
584                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587                     strcmp(type, LUSTRE_OST_NAME) != 0)
588                         continue;
589
590                 if (strncmp(obd->obd_name, fsname, namelen))
591                         continue;
592
593                 class_incref(obd, __FUNCTION__, obd);
594                 cfs_read_unlock(&obd_dev_lock);
595                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
596                                          sizeof(KEY_SPTLRPC_CONF),
597                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
598                 rc = rc ? rc : rc2;
599                 class_decref(obd, __FUNCTION__, obd);
600                 cfs_read_lock(&obd_dev_lock);
601         }
602         cfs_read_unlock(&obd_dev_lock);
603         return rc;
604 }
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
606
607 void obd_cleanup_caches(void)
608 {
609         int rc;
610
611         ENTRY;
612         if (obd_device_cachep) {
613                 rc = cfs_mem_cache_destroy(obd_device_cachep);
614                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615                 obd_device_cachep = NULL;
616         }
617         if (obdo_cachep) {
618                 rc = cfs_mem_cache_destroy(obdo_cachep);
619                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
620                 obdo_cachep = NULL;
621         }
622         if (import_cachep) {
623                 rc = cfs_mem_cache_destroy(import_cachep);
624                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625                 import_cachep = NULL;
626         }
627         if (capa_cachep) {
628                 rc = cfs_mem_cache_destroy(capa_cachep);
629                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
630                 capa_cachep = NULL;
631         }
632         EXIT;
633 }
634
635 int obd_init_caches(void)
636 {
637         ENTRY;
638
639         LASSERT(obd_device_cachep == NULL);
640         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641                                                  sizeof(struct obd_device),
642                                                  0, 0);
643         if (!obd_device_cachep)
644                 GOTO(out, -ENOMEM);
645
646         LASSERT(obdo_cachep == NULL);
647         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
648                                            0, 0);
649         if (!obdo_cachep)
650                 GOTO(out, -ENOMEM);
651
652         LASSERT(import_cachep == NULL);
653         import_cachep = cfs_mem_cache_create("ll_import_cache",
654                                              sizeof(struct obd_import),
655                                              0, 0);
656         if (!import_cachep)
657                 GOTO(out, -ENOMEM);
658
659         LASSERT(capa_cachep == NULL);
660         capa_cachep = cfs_mem_cache_create("capa_cache",
661                                            sizeof(struct obd_capa), 0, 0);
662         if (!capa_cachep)
663                 GOTO(out, -ENOMEM);
664
665         RETURN(0);
666  out:
667         obd_cleanup_caches();
668         RETURN(-ENOMEM);
669
670 }
671
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
674 {
675         struct obd_export *export;
676         ENTRY;
677
678         if (!conn) {
679                 CDEBUG(D_CACHE, "looking for null handle\n");
680                 RETURN(NULL);
681         }
682
683         if (conn->cookie == -1) {  /* this means assign a new connection */
684                 CDEBUG(D_CACHE, "want a new connection\n");
685                 RETURN(NULL);
686         }
687
688         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689         export = class_handle2object(conn->cookie);
690         RETURN(export);
691 }
692 EXPORT_SYMBOL(class_conn2export);
693
694 struct obd_device *class_exp2obd(struct obd_export *exp)
695 {
696         if (exp)
697                 return exp->exp_obd;
698         return NULL;
699 }
700 EXPORT_SYMBOL(class_exp2obd);
701
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
703 {
704         struct obd_export *export;
705         export = class_conn2export(conn);
706         if (export) {
707                 struct obd_device *obd = export->exp_obd;
708                 class_export_put(export);
709                 return obd;
710         }
711         return NULL;
712 }
713 EXPORT_SYMBOL(class_conn2obd);
714
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
716 {
717         struct obd_device *obd = exp->exp_obd;
718         if (obd == NULL)
719                 return NULL;
720         return obd->u.cli.cl_import;
721 }
722 EXPORT_SYMBOL(class_exp2cliimp);
723
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
725 {
726         struct obd_device *obd = class_conn2obd(conn);
727         if (obd == NULL)
728                 return NULL;
729         return obd->u.cli.cl_import;
730 }
731 EXPORT_SYMBOL(class_conn2cliimp);
732
733 /* Export management functions */
734 static void class_export_destroy(struct obd_export *exp)
735 {
736         struct obd_device *obd = exp->exp_obd;
737         ENTRY;
738
739         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
740         LASSERT(obd != NULL);
741
742         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
743                exp->exp_client_uuid.uuid, obd->obd_name);
744
745         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
746         if (exp->exp_connection)
747                 ptlrpc_put_connection_superhack(exp->exp_connection);
748
749         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
750         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
751         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
752         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
753         obd_destroy_export(exp);
754         class_decref(obd, "export", exp);
755
756         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
757         EXIT;
758 }
759
760 static void export_handle_addref(void *export)
761 {
762         class_export_get(export);
763 }
764
765 static struct portals_handle_ops export_handle_ops = {
766         .hop_addref = export_handle_addref,
767         .hop_free   = NULL,
768 };
769
770 struct obd_export *class_export_get(struct obd_export *exp)
771 {
772         cfs_atomic_inc(&exp->exp_refcount);
773         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
774                cfs_atomic_read(&exp->exp_refcount));
775         return exp;
776 }
777 EXPORT_SYMBOL(class_export_get);
778
779 void class_export_put(struct obd_export *exp)
780 {
781         LASSERT(exp != NULL);
782         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
783         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
784                cfs_atomic_read(&exp->exp_refcount) - 1);
785
786         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
787                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
788                 CDEBUG(D_IOCTL, "final put %p/%s\n",
789                        exp, exp->exp_client_uuid.uuid);
790
791                 /* release nid stat refererence */
792                 lprocfs_exp_cleanup(exp);
793
794                 obd_zombie_export_add(exp);
795         }
796 }
797 EXPORT_SYMBOL(class_export_put);
798
799 /* Creates a new export, adds it to the hash table, and returns a
800  * pointer to it. The refcount is 2: one for the hash reference, and
801  * one for the pointer returned by this function. */
802 struct obd_export *class_new_export(struct obd_device *obd,
803                                     struct obd_uuid *cluuid)
804 {
805         struct obd_export *export;
806         cfs_hash_t *hash = NULL;
807         int rc = 0;
808         ENTRY;
809
810         OBD_ALLOC_PTR(export);
811         if (!export)
812                 return ERR_PTR(-ENOMEM);
813
814         export->exp_conn_cnt = 0;
815         export->exp_lock_hash = NULL;
816         export->exp_flock_hash = NULL;
817         cfs_atomic_set(&export->exp_refcount, 2);
818         cfs_atomic_set(&export->exp_rpc_count, 0);
819         cfs_atomic_set(&export->exp_cb_count, 0);
820         cfs_atomic_set(&export->exp_locks_count, 0);
821 #if LUSTRE_TRACKS_LOCK_EXP_REFS
822         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
823         cfs_spin_lock_init(&export->exp_locks_list_guard);
824 #endif
825         cfs_atomic_set(&export->exp_replay_count, 0);
826         export->exp_obd = obd;
827         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
828         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
829         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
830         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
831         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
832         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
833         class_handle_hash(&export->exp_handle, &export_handle_ops);
834         export->exp_last_request_time = cfs_time_current_sec();
835         cfs_spin_lock_init(&export->exp_lock);
836         cfs_spin_lock_init(&export->exp_rpc_lock);
837         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
838         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
839         cfs_spin_lock_init(&export->exp_bl_list_lock);
840         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
841
842         export->exp_sp_peer = LUSTRE_SP_ANY;
843         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
844         export->exp_client_uuid = *cluuid;
845         obd_init_export(export);
846
847         cfs_spin_lock(&obd->obd_dev_lock);
848          /* shouldn't happen, but might race */
849         if (obd->obd_stopping)
850                 GOTO(exit_unlock, rc = -ENODEV);
851
852         hash = cfs_hash_getref(obd->obd_uuid_hash);
853         if (hash == NULL)
854                 GOTO(exit_unlock, rc = -ENODEV);
855         cfs_spin_unlock(&obd->obd_dev_lock);
856
857         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
858                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
859                 if (rc != 0) {
860                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
861                                       obd->obd_name, cluuid->uuid, rc);
862                         GOTO(exit_err, rc = -EALREADY);
863                 }
864         }
865
866         cfs_spin_lock(&obd->obd_dev_lock);
867         if (obd->obd_stopping) {
868                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
869                 GOTO(exit_unlock, rc = -ENODEV);
870         }
871
872         class_incref(obd, "export", export);
873         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
874         cfs_list_add_tail(&export->exp_obd_chain_timed,
875                           &export->exp_obd->obd_exports_timed);
876         export->exp_obd->obd_num_exports++;
877         cfs_spin_unlock(&obd->obd_dev_lock);
878         cfs_hash_putref(hash);
879         RETURN(export);
880
881 exit_unlock:
882         cfs_spin_unlock(&obd->obd_dev_lock);
883 exit_err:
884         if (hash)
885                 cfs_hash_putref(hash);
886         class_handle_unhash(&export->exp_handle);
887         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
888         obd_destroy_export(export);
889         OBD_FREE_PTR(export);
890         return ERR_PTR(rc);
891 }
892 EXPORT_SYMBOL(class_new_export);
893
894 void class_unlink_export(struct obd_export *exp)
895 {
896         class_handle_unhash(&exp->exp_handle);
897
898         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
899         /* delete an uuid-export hashitem from hashtables */
900         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
901                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
902                              &exp->exp_client_uuid,
903                              &exp->exp_uuid_hash);
904
905         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
906         cfs_list_del_init(&exp->exp_obd_chain_timed);
907         exp->exp_obd->obd_num_exports--;
908         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
909         class_export_put(exp);
910 }
911 EXPORT_SYMBOL(class_unlink_export);
912
913 /* Import management functions */
914 void class_import_destroy(struct obd_import *imp)
915 {
916         ENTRY;
917
918         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
919                 imp->imp_obd->obd_name);
920
921         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
922
923         ptlrpc_put_connection_superhack(imp->imp_connection);
924
925         while (!cfs_list_empty(&imp->imp_conn_list)) {
926                 struct obd_import_conn *imp_conn;
927
928                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
929                                           struct obd_import_conn, oic_item);
930                 cfs_list_del_init(&imp_conn->oic_item);
931                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
932                 OBD_FREE(imp_conn, sizeof(*imp_conn));
933         }
934
935         LASSERT(imp->imp_sec == NULL);
936         class_decref(imp->imp_obd, "import", imp);
937         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
938         EXIT;
939 }
940
941 static void import_handle_addref(void *import)
942 {
943         class_import_get(import);
944 }
945
946 static struct portals_handle_ops import_handle_ops = {
947         .hop_addref = import_handle_addref,
948         .hop_free   = NULL,
949 };
950
951 struct obd_import *class_import_get(struct obd_import *import)
952 {
953         cfs_atomic_inc(&import->imp_refcount);
954         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
955                cfs_atomic_read(&import->imp_refcount),
956                import->imp_obd->obd_name);
957         return import;
958 }
959 EXPORT_SYMBOL(class_import_get);
960
961 void class_import_put(struct obd_import *imp)
962 {
963         ENTRY;
964
965         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
966         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
967
968         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
969                cfs_atomic_read(&imp->imp_refcount) - 1,
970                imp->imp_obd->obd_name);
971
972         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
973                 CDEBUG(D_INFO, "final put import %p\n", imp);
974                 obd_zombie_import_add(imp);
975         }
976
977         EXIT;
978 }
979 EXPORT_SYMBOL(class_import_put);
980
981 static void init_imp_at(struct imp_at *at) {
982         int i;
983         at_init(&at->iat_net_latency, 0, 0);
984         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
985                 /* max service estimates are tracked on the server side, so
986                    don't use the AT history here, just use the last reported
987                    val. (But keep hist for proc histogram, worst_ever) */
988                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
989                         AT_FLG_NOHIST);
990         }
991 }
992
993 struct obd_import *class_new_import(struct obd_device *obd)
994 {
995         struct obd_import *imp;
996
997         OBD_ALLOC(imp, sizeof(*imp));
998         if (imp == NULL)
999                 return NULL;
1000
1001         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1002         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1003         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1004         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1005         cfs_spin_lock_init(&imp->imp_lock);
1006         imp->imp_last_success_conn = 0;
1007         imp->imp_state = LUSTRE_IMP_NEW;
1008         imp->imp_obd = class_incref(obd, "import", imp);
1009         cfs_mutex_init(&imp->imp_sec_mutex);
1010         cfs_waitq_init(&imp->imp_recovery_waitq);
1011
1012         cfs_atomic_set(&imp->imp_refcount, 2);
1013         cfs_atomic_set(&imp->imp_unregistering, 0);
1014         cfs_atomic_set(&imp->imp_inflight, 0);
1015         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1016         cfs_atomic_set(&imp->imp_inval_count, 0);
1017         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1018         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1019         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1020         init_imp_at(&imp->imp_at);
1021
1022         /* the default magic is V2, will be used in connect RPC, and
1023          * then adjusted according to the flags in request/reply. */
1024         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1025
1026         return imp;
1027 }
1028 EXPORT_SYMBOL(class_new_import);
1029
1030 void class_destroy_import(struct obd_import *import)
1031 {
1032         LASSERT(import != NULL);
1033         LASSERT(import != LP_POISON);
1034
1035         class_handle_unhash(&import->imp_handle);
1036
1037         cfs_spin_lock(&import->imp_lock);
1038         import->imp_generation++;
1039         cfs_spin_unlock(&import->imp_lock);
1040         class_import_put(import);
1041 }
1042 EXPORT_SYMBOL(class_destroy_import);
1043
1044 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1045
1046 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1047 {
1048         cfs_spin_lock(&exp->exp_locks_list_guard);
1049
1050         LASSERT(lock->l_exp_refs_nr >= 0);
1051
1052         if (lock->l_exp_refs_target != NULL &&
1053             lock->l_exp_refs_target != exp) {
1054                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1055                               exp, lock, lock->l_exp_refs_target);
1056         }
1057         if ((lock->l_exp_refs_nr ++) == 0) {
1058                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1059                 lock->l_exp_refs_target = exp;
1060         }
1061         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1062                lock, exp, lock->l_exp_refs_nr);
1063         cfs_spin_unlock(&exp->exp_locks_list_guard);
1064 }
1065 EXPORT_SYMBOL(__class_export_add_lock_ref);
1066
1067 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1068 {
1069         cfs_spin_lock(&exp->exp_locks_list_guard);
1070         LASSERT(lock->l_exp_refs_nr > 0);
1071         if (lock->l_exp_refs_target != exp) {
1072                 LCONSOLE_WARN("lock %p, "
1073                               "mismatching export pointers: %p, %p\n",
1074                               lock, lock->l_exp_refs_target, exp);
1075         }
1076         if (-- lock->l_exp_refs_nr == 0) {
1077                 cfs_list_del_init(&lock->l_exp_refs_link);
1078                 lock->l_exp_refs_target = NULL;
1079         }
1080         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1081                lock, exp, lock->l_exp_refs_nr);
1082         cfs_spin_unlock(&exp->exp_locks_list_guard);
1083 }
1084 EXPORT_SYMBOL(__class_export_del_lock_ref);
1085 #endif
1086
1087 /* A connection defines an export context in which preallocation can
1088    be managed. This releases the export pointer reference, and returns
1089    the export handle, so the export refcount is 1 when this function
1090    returns. */
1091 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1092                   struct obd_uuid *cluuid)
1093 {
1094         struct obd_export *export;
1095         LASSERT(conn != NULL);
1096         LASSERT(obd != NULL);
1097         LASSERT(cluuid != NULL);
1098         ENTRY;
1099
1100         export = class_new_export(obd, cluuid);
1101         if (IS_ERR(export))
1102                 RETURN(PTR_ERR(export));
1103
1104         conn->cookie = export->exp_handle.h_cookie;
1105         class_export_put(export);
1106
1107         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1108                cluuid->uuid, conn->cookie);
1109         RETURN(0);
1110 }
1111 EXPORT_SYMBOL(class_connect);
1112
1113 /* if export is involved in recovery then clean up related things */
1114 void class_export_recovery_cleanup(struct obd_export *exp)
1115 {
1116         struct obd_device *obd = exp->exp_obd;
1117
1118         cfs_spin_lock(&obd->obd_recovery_task_lock);
1119         if (exp->exp_delayed)
1120                 obd->obd_delayed_clients--;
1121         if (obd->obd_recovering && exp->exp_in_recovery) {
1122                 cfs_spin_lock(&exp->exp_lock);
1123                 exp->exp_in_recovery = 0;
1124                 cfs_spin_unlock(&exp->exp_lock);
1125                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1126                 cfs_atomic_dec(&obd->obd_connected_clients);
1127         }
1128         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1129         /** Cleanup req replay fields */
1130         if (exp->exp_req_replay_needed) {
1131                 cfs_spin_lock(&exp->exp_lock);
1132                 exp->exp_req_replay_needed = 0;
1133                 cfs_spin_unlock(&exp->exp_lock);
1134                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1135                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1136         }
1137         /** Cleanup lock replay data */
1138         if (exp->exp_lock_replay_needed) {
1139                 cfs_spin_lock(&exp->exp_lock);
1140                 exp->exp_lock_replay_needed = 0;
1141                 cfs_spin_unlock(&exp->exp_lock);
1142                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1143                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1144         }
1145 }
1146
1147 /* This function removes 1-3 references from the export:
1148  * 1 - for export pointer passed
1149  * and if disconnect really need
1150  * 2 - removing from hash
1151  * 3 - in client_unlink_export
1152  * The export pointer passed to this function can destroyed */
1153 int class_disconnect(struct obd_export *export)
1154 {
1155         int already_disconnected;
1156         ENTRY;
1157
1158         if (export == NULL) {
1159                 CWARN("attempting to free NULL export %p\n", export);
1160                 RETURN(-EINVAL);
1161         }
1162
1163         cfs_spin_lock(&export->exp_lock);
1164         already_disconnected = export->exp_disconnected;
1165         export->exp_disconnected = 1;
1166         cfs_spin_unlock(&export->exp_lock);
1167
1168         /* class_cleanup(), abort_recovery(), and class_fail_export()
1169          * all end up in here, and if any of them race we shouldn't
1170          * call extra class_export_puts(). */
1171         if (already_disconnected) {
1172                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1173                 GOTO(no_disconn, already_disconnected);
1174         }
1175
1176         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1177                export->exp_handle.h_cookie);
1178
1179         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1180                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1181                              &export->exp_connection->c_peer.nid,
1182                              &export->exp_nid_hash);
1183
1184         class_export_recovery_cleanup(export);
1185         class_unlink_export(export);
1186 no_disconn:
1187         class_export_put(export);
1188         RETURN(0);
1189 }
1190 EXPORT_SYMBOL(class_disconnect);
1191
1192 /* Return non-zero for a fully connected export */
1193 int class_connected_export(struct obd_export *exp)
1194 {
1195         if (exp) {
1196                 int connected;
1197                 cfs_spin_lock(&exp->exp_lock);
1198                 connected = (exp->exp_conn_cnt > 0);
1199                 cfs_spin_unlock(&exp->exp_lock);
1200                 return connected;
1201         }
1202         return 0;
1203 }
1204 EXPORT_SYMBOL(class_connected_export);
1205
1206 static void class_disconnect_export_list(cfs_list_t *list,
1207                                          enum obd_option flags)
1208 {
1209         int rc;
1210         struct obd_export *exp;
1211         ENTRY;
1212
1213         /* It's possible that an export may disconnect itself, but
1214          * nothing else will be added to this list. */
1215         while (!cfs_list_empty(list)) {
1216                 exp = cfs_list_entry(list->next, struct obd_export,
1217                                      exp_obd_chain);
1218                 /* need for safe call CDEBUG after obd_disconnect */
1219                 class_export_get(exp);
1220
1221                 cfs_spin_lock(&exp->exp_lock);
1222                 exp->exp_flags = flags;
1223                 cfs_spin_unlock(&exp->exp_lock);
1224
1225                 if (obd_uuid_equals(&exp->exp_client_uuid,
1226                                     &exp->exp_obd->obd_uuid)) {
1227                         CDEBUG(D_HA,
1228                                "exp %p export uuid == obd uuid, don't discon\n",
1229                                exp);
1230                         /* Need to delete this now so we don't end up pointing
1231                          * to work_list later when this export is cleaned up. */
1232                         cfs_list_del_init(&exp->exp_obd_chain);
1233                         class_export_put(exp);
1234                         continue;
1235                 }
1236
1237                 class_export_get(exp);
1238                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1239                        "last request at "CFS_TIME_T"\n",
1240                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1241                        exp, exp->exp_last_request_time);
1242                 /* release one export reference anyway */
1243                 rc = obd_disconnect(exp);
1244
1245                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1246                        obd_export_nid2str(exp), exp, rc);
1247                 class_export_put(exp);
1248         }
1249         EXIT;
1250 }
1251
1252 void class_disconnect_exports(struct obd_device *obd)
1253 {
1254         cfs_list_t work_list;
1255         ENTRY;
1256
1257         /* Move all of the exports from obd_exports to a work list, en masse. */
1258         CFS_INIT_LIST_HEAD(&work_list);
1259         cfs_spin_lock(&obd->obd_dev_lock);
1260         cfs_list_splice_init(&obd->obd_exports, &work_list);
1261         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1262         cfs_spin_unlock(&obd->obd_dev_lock);
1263
1264         if (!cfs_list_empty(&work_list)) {
1265                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1266                        "disconnecting them\n", obd->obd_minor, obd);
1267                 class_disconnect_export_list(&work_list,
1268                                              exp_flags_from_obd(obd));
1269         } else
1270                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1271                        obd->obd_minor, obd);
1272         EXIT;
1273 }
1274 EXPORT_SYMBOL(class_disconnect_exports);
1275
1276 /* Remove exports that have not completed recovery.
1277  */
1278 void class_disconnect_stale_exports(struct obd_device *obd,
1279                                     int (*test_export)(struct obd_export *))
1280 {
1281         cfs_list_t work_list;
1282         struct obd_export *exp, *n;
1283         int evicted = 0;
1284         ENTRY;
1285
1286         CFS_INIT_LIST_HEAD(&work_list);
1287         cfs_spin_lock(&obd->obd_dev_lock);
1288         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1289                                      exp_obd_chain) {
1290                 /* don't count self-export as client */
1291                 if (obd_uuid_equals(&exp->exp_client_uuid,
1292                                     &exp->exp_obd->obd_uuid))
1293                         continue;
1294
1295                 cfs_spin_lock(&exp->exp_lock);
1296                 if (test_export(exp)) {
1297                         cfs_spin_unlock(&exp->exp_lock);
1298                         continue;
1299                 }
1300                 exp->exp_failed = 1;
1301                 cfs_spin_unlock(&exp->exp_lock);
1302
1303                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1304                 evicted++;
1305                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1306                        obd->obd_name, exp->exp_client_uuid.uuid,
1307                        exp->exp_connection == NULL ? "<unknown>" :
1308                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1309                 print_export_data(exp, "EVICTING", 0);
1310         }
1311         cfs_spin_unlock(&obd->obd_dev_lock);
1312
1313         if (evicted) {
1314                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1315                               obd->obd_name, evicted);
1316                 obd->obd_stale_clients += evicted;
1317         }
1318         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1319                                                  OBD_OPT_ABORT_RECOV);
1320         EXIT;
1321 }
1322 EXPORT_SYMBOL(class_disconnect_stale_exports);
1323
1324 void class_fail_export(struct obd_export *exp)
1325 {
1326         int rc, already_failed;
1327
1328         cfs_spin_lock(&exp->exp_lock);
1329         already_failed = exp->exp_failed;
1330         exp->exp_failed = 1;
1331         cfs_spin_unlock(&exp->exp_lock);
1332
1333         if (already_failed) {
1334                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1335                        exp, exp->exp_client_uuid.uuid);
1336                 return;
1337         }
1338
1339         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1340                exp, exp->exp_client_uuid.uuid);
1341
1342         if (obd_dump_on_timeout)
1343                 libcfs_debug_dumplog();
1344
1345         /* need for safe call CDEBUG after obd_disconnect */
1346         class_export_get(exp);
1347
1348         /* Most callers into obd_disconnect are removing their own reference
1349          * (request, for example) in addition to the one from the hash table.
1350          * We don't have such a reference here, so make one. */
1351         class_export_get(exp);
1352         rc = obd_disconnect(exp);
1353         if (rc)
1354                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1355         else
1356                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1357                        exp, exp->exp_client_uuid.uuid);
1358         class_export_put(exp);
1359 }
1360 EXPORT_SYMBOL(class_fail_export);
1361
1362 char *obd_export_nid2str(struct obd_export *exp)
1363 {
1364         if (exp->exp_connection != NULL)
1365                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1366
1367         return "(no nid)";
1368 }
1369 EXPORT_SYMBOL(obd_export_nid2str);
1370
1371 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1372 {
1373         struct obd_export *doomed_exp = NULL;
1374         int exports_evicted = 0;
1375
1376         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1377
1378         do {
1379                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1380                 if (doomed_exp == NULL)
1381                         break;
1382
1383                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1384                          "nid %s found, wanted nid %s, requested nid %s\n",
1385                          obd_export_nid2str(doomed_exp),
1386                          libcfs_nid2str(nid_key), nid);
1387                 LASSERTF(doomed_exp != obd->obd_self_export,
1388                          "self-export is hashed by NID?\n");
1389                 exports_evicted++;
1390                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1391                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1392                        exports_evicted);
1393                 class_fail_export(doomed_exp);
1394                 class_export_put(doomed_exp);
1395         } while (1);
1396
1397         if (!exports_evicted)
1398                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1399                        obd->obd_name, nid);
1400         return exports_evicted;
1401 }
1402 EXPORT_SYMBOL(obd_export_evict_by_nid);
1403
1404 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1405 {
1406         struct obd_export *doomed_exp = NULL;
1407         struct obd_uuid doomed_uuid;
1408         int exports_evicted = 0;
1409
1410         obd_str2uuid(&doomed_uuid, uuid);
1411         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1412                 CERROR("%s: can't evict myself\n", obd->obd_name);
1413                 return exports_evicted;
1414         }
1415
1416         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1417
1418         if (doomed_exp == NULL) {
1419                 CERROR("%s: can't disconnect %s: no exports found\n",
1420                        obd->obd_name, uuid);
1421         } else {
1422                 CWARN("%s: evicting %s at adminstrative request\n",
1423                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1424                 class_fail_export(doomed_exp);
1425                 class_export_put(doomed_exp);
1426                 exports_evicted++;
1427         }
1428
1429         return exports_evicted;
1430 }
1431 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1432
1433 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1434 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1435 EXPORT_SYMBOL(class_export_dump_hook);
1436 #endif
1437
1438 static void print_export_data(struct obd_export *exp, const char *status,
1439                               int locks)
1440 {
1441         struct ptlrpc_reply_state *rs;
1442         struct ptlrpc_reply_state *first_reply = NULL;
1443         int nreplies = 0;
1444
1445         cfs_spin_lock(&exp->exp_lock);
1446         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1447                                 rs_exp_list) {
1448                 if (nreplies == 0)
1449                         first_reply = rs;
1450                 nreplies++;
1451         }
1452         cfs_spin_unlock(&exp->exp_lock);
1453
1454         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1455                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1456                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1457                cfs_atomic_read(&exp->exp_rpc_count),
1458                cfs_atomic_read(&exp->exp_cb_count),
1459                cfs_atomic_read(&exp->exp_locks_count),
1460                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1461                nreplies, first_reply, nreplies > 3 ? "..." : "",
1462                exp->exp_last_committed);
1463 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1464         if (locks && class_export_dump_hook != NULL)
1465                 class_export_dump_hook(exp);
1466 #endif
1467 }
1468
1469 void dump_exports(struct obd_device *obd, int locks)
1470 {
1471         struct obd_export *exp;
1472
1473         cfs_spin_lock(&obd->obd_dev_lock);
1474         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1475                 print_export_data(exp, "ACTIVE", locks);
1476         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1477                 print_export_data(exp, "UNLINKED", locks);
1478         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1479                 print_export_data(exp, "DELAYED", locks);
1480         cfs_spin_unlock(&obd->obd_dev_lock);
1481         cfs_spin_lock(&obd_zombie_impexp_lock);
1482         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1483                 print_export_data(exp, "ZOMBIE", locks);
1484         cfs_spin_unlock(&obd_zombie_impexp_lock);
1485 }
1486 EXPORT_SYMBOL(dump_exports);
1487
1488 void obd_exports_barrier(struct obd_device *obd)
1489 {
1490         int waited = 2;
1491         LASSERT(cfs_list_empty(&obd->obd_exports));
1492         cfs_spin_lock(&obd->obd_dev_lock);
1493         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1494                 cfs_spin_unlock(&obd->obd_dev_lock);
1495                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1496                                                    cfs_time_seconds(waited));
1497                 if (waited > 5 && IS_PO2(waited)) {
1498                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1499                                       "more than %d seconds. "
1500                                       "The obd refcount = %d. Is it stuck?\n",
1501                                       obd->obd_name, waited,
1502                                       cfs_atomic_read(&obd->obd_refcount));
1503                         dump_exports(obd, 1);
1504                 }
1505                 waited *= 2;
1506                 cfs_spin_lock(&obd->obd_dev_lock);
1507         }
1508         cfs_spin_unlock(&obd->obd_dev_lock);
1509 }
1510 EXPORT_SYMBOL(obd_exports_barrier);
1511
1512 /* Total amount of zombies to be destroyed */
1513 static int zombies_count = 0;
1514
1515 /**
1516  * kill zombie imports and exports
1517  */
1518 void obd_zombie_impexp_cull(void)
1519 {
1520         struct obd_import *import;
1521         struct obd_export *export;
1522         ENTRY;
1523
1524         do {
1525                 cfs_spin_lock(&obd_zombie_impexp_lock);
1526
1527                 import = NULL;
1528                 if (!cfs_list_empty(&obd_zombie_imports)) {
1529                         import = cfs_list_entry(obd_zombie_imports.next,
1530                                                 struct obd_import,
1531                                                 imp_zombie_chain);
1532                         cfs_list_del_init(&import->imp_zombie_chain);
1533                 }
1534
1535                 export = NULL;
1536                 if (!cfs_list_empty(&obd_zombie_exports)) {
1537                         export = cfs_list_entry(obd_zombie_exports.next,
1538                                                 struct obd_export,
1539                                                 exp_obd_chain);
1540                         cfs_list_del_init(&export->exp_obd_chain);
1541                 }
1542
1543                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1544
1545                 if (import != NULL) {
1546                         class_import_destroy(import);
1547                         cfs_spin_lock(&obd_zombie_impexp_lock);
1548                         zombies_count--;
1549                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1550                 }
1551
1552                 if (export != NULL) {
1553                         class_export_destroy(export);
1554                         cfs_spin_lock(&obd_zombie_impexp_lock);
1555                         zombies_count--;
1556                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1557                 }
1558
1559                 cfs_cond_resched();
1560         } while (import != NULL || export != NULL);
1561         EXIT;
1562 }
1563
1564 static cfs_completion_t         obd_zombie_start;
1565 static cfs_completion_t         obd_zombie_stop;
1566 static unsigned long            obd_zombie_flags;
1567 static cfs_waitq_t              obd_zombie_waitq;
1568 static pid_t                    obd_zombie_pid;
1569
1570 enum {
1571         OBD_ZOMBIE_STOP   = 1 << 1
1572 };
1573
1574 /**
1575  * check for work for kill zombie import/export thread.
1576  */
1577 static int obd_zombie_impexp_check(void *arg)
1578 {
1579         int rc;
1580
1581         cfs_spin_lock(&obd_zombie_impexp_lock);
1582         rc = (zombies_count == 0) &&
1583              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1584         cfs_spin_unlock(&obd_zombie_impexp_lock);
1585
1586         RETURN(rc);
1587 }
1588
1589 /**
1590  * Add export to the obd_zombe thread and notify it.
1591  */
1592 static void obd_zombie_export_add(struct obd_export *exp) {
1593         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1594         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1595         cfs_list_del_init(&exp->exp_obd_chain);
1596         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1597         cfs_spin_lock(&obd_zombie_impexp_lock);
1598         zombies_count++;
1599         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1600         cfs_spin_unlock(&obd_zombie_impexp_lock);
1601
1602         obd_zombie_impexp_notify();
1603 }
1604
1605 /**
1606  * Add import to the obd_zombe thread and notify it.
1607  */
1608 static void obd_zombie_import_add(struct obd_import *imp) {
1609         LASSERT(imp->imp_sec == NULL);
1610         LASSERT(imp->imp_rq_pool == NULL);
1611         cfs_spin_lock(&obd_zombie_impexp_lock);
1612         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1613         zombies_count++;
1614         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1615         cfs_spin_unlock(&obd_zombie_impexp_lock);
1616
1617         obd_zombie_impexp_notify();
1618 }
1619
1620 /**
1621  * notify import/export destroy thread about new zombie.
1622  */
1623 static void obd_zombie_impexp_notify(void)
1624 {
1625         /*
1626          * Make sure obd_zomebie_impexp_thread get this notification.
1627          * It is possible this signal only get by obd_zombie_barrier, and
1628          * barrier gulps this notification and sleeps away and hangs ensues
1629          */
1630         cfs_waitq_broadcast(&obd_zombie_waitq);
1631 }
1632
1633 /**
1634  * check whether obd_zombie is idle
1635  */
1636 static int obd_zombie_is_idle(void)
1637 {
1638         int rc;
1639
1640         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1641         cfs_spin_lock(&obd_zombie_impexp_lock);
1642         rc = (zombies_count == 0);
1643         cfs_spin_unlock(&obd_zombie_impexp_lock);
1644         return rc;
1645 }
1646
1647 /**
1648  * wait when obd_zombie import/export queues become empty
1649  */
1650 void obd_zombie_barrier(void)
1651 {
1652         struct l_wait_info lwi = { 0 };
1653
1654         if (obd_zombie_pid == cfs_curproc_pid())
1655                 /* don't wait for myself */
1656                 return;
1657         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1658 }
1659 EXPORT_SYMBOL(obd_zombie_barrier);
1660
1661 #ifdef __KERNEL__
1662
1663 /**
1664  * destroy zombie export/import thread.
1665  */
1666 static int obd_zombie_impexp_thread(void *unused)
1667 {
1668         int rc;
1669
1670         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1671                 cfs_complete(&obd_zombie_start);
1672                 RETURN(rc);
1673         }
1674
1675         cfs_complete(&obd_zombie_start);
1676
1677         obd_zombie_pid = cfs_curproc_pid();
1678
1679         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1680                 struct l_wait_info lwi = { 0 };
1681
1682                 l_wait_event(obd_zombie_waitq,
1683                              !obd_zombie_impexp_check(NULL), &lwi);
1684                 obd_zombie_impexp_cull();
1685
1686                 /*
1687                  * Notify obd_zombie_barrier callers that queues
1688                  * may be empty.
1689                  */
1690                 cfs_waitq_signal(&obd_zombie_waitq);
1691         }
1692
1693         cfs_complete(&obd_zombie_stop);
1694
1695         RETURN(0);
1696 }
1697
1698 #else /* ! KERNEL */
1699
1700 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1701 static void *obd_zombie_impexp_work_cb;
1702 static void *obd_zombie_impexp_idle_cb;
1703
1704 int obd_zombie_impexp_kill(void *arg)
1705 {
1706         int rc = 0;
1707
1708         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1709                 obd_zombie_impexp_cull();
1710                 rc = 1;
1711         }
1712         cfs_atomic_dec(&zombie_recur);
1713         return rc;
1714 }
1715
1716 #endif
1717
1718 /**
1719  * start destroy zombie import/export thread
1720  */
1721 int obd_zombie_impexp_init(void)
1722 {
1723         int rc;
1724
1725         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1726         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1727         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1728         cfs_init_completion(&obd_zombie_start);
1729         cfs_init_completion(&obd_zombie_stop);
1730         cfs_waitq_init(&obd_zombie_waitq);
1731         obd_zombie_pid = 0;
1732
1733 #ifdef __KERNEL__
1734         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1735         if (rc < 0)
1736                 RETURN(rc);
1737
1738         cfs_wait_for_completion(&obd_zombie_start);
1739 #else
1740
1741         obd_zombie_impexp_work_cb =
1742                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1743                                                  &obd_zombie_impexp_kill, NULL);
1744
1745         obd_zombie_impexp_idle_cb =
1746                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1747                                                  &obd_zombie_impexp_check, NULL);
1748         rc = 0;
1749 #endif
1750         RETURN(rc);
1751 }
1752 /**
1753  * stop destroy zombie import/export thread
1754  */
1755 void obd_zombie_impexp_stop(void)
1756 {
1757         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1758         obd_zombie_impexp_notify();
1759 #ifdef __KERNEL__
1760         cfs_wait_for_completion(&obd_zombie_stop);
1761 #else
1762         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1763         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1764 #endif
1765 }
1766
1767 /***** Kernel-userspace comm helpers *******/
1768
1769 /* Get length of entire message, including header */
1770 int kuc_len(int payload_len)
1771 {
1772         return sizeof(struct kuc_hdr) + payload_len;
1773 }
1774 EXPORT_SYMBOL(kuc_len);
1775
1776 /* Get a pointer to kuc header, given a ptr to the payload
1777  * @param p Pointer to payload area
1778  * @returns Pointer to kuc header
1779  */
1780 struct kuc_hdr * kuc_ptr(void *p)
1781 {
1782         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1783         LASSERT(lh->kuc_magic == KUC_MAGIC);
1784         return lh;
1785 }
1786 EXPORT_SYMBOL(kuc_ptr);
1787
1788 /* Test if payload is part of kuc message
1789  * @param p Pointer to payload area
1790  * @returns boolean
1791  */
1792 int kuc_ispayload(void *p)
1793 {
1794         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1795
1796         if (kh->kuc_magic == KUC_MAGIC)
1797                 return 1;
1798         else
1799                 return 0;
1800 }
1801 EXPORT_SYMBOL(kuc_ispayload);
1802
1803 /* Alloc space for a message, and fill in header
1804  * @return Pointer to payload area
1805  */
1806 void *kuc_alloc(int payload_len, int transport, int type)
1807 {
1808         struct kuc_hdr *lh;
1809         int len = kuc_len(payload_len);
1810
1811         OBD_ALLOC(lh, len);
1812         if (lh == NULL)
1813                 return ERR_PTR(-ENOMEM);
1814
1815         lh->kuc_magic = KUC_MAGIC;
1816         lh->kuc_transport = transport;
1817         lh->kuc_msgtype = type;
1818         lh->kuc_msglen = len;
1819
1820         return (void *)(lh + 1);
1821 }
1822 EXPORT_SYMBOL(kuc_alloc);
1823
1824 /* Takes pointer to payload area */
1825 inline void kuc_free(void *p, int payload_len)
1826 {
1827         struct kuc_hdr *lh = kuc_ptr(p);
1828         OBD_FREE(lh, kuc_len(payload_len));
1829 }
1830 EXPORT_SYMBOL(kuc_free);
1831
1832
1833