Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         cfs_list_t *tmp;
101         struct obd_type *type;
102
103         cfs_spin_lock(&obd_types_lock);
104         cfs_list_for_each(tmp, &obd_types) {
105                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         cfs_spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         cfs_spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
120         if (!type) {
121                 const char *modname = name;
122                 if (!cfs_request_module("%s", modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 cfs_spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 cfs_try_module_get(type->typ_dt_ops->o_owner);
135                 cfs_spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139 EXPORT_SYMBOL(class_get_type);
140
141 void class_put_type(struct obd_type *type)
142 {
143         LASSERT(type);
144         cfs_spin_lock(&type->obd_type_lock);
145         type->typ_refcnt--;
146         cfs_module_put(type->typ_dt_ops->o_owner);
147         cfs_spin_unlock(&type->obd_type_lock);
148 }
149 EXPORT_SYMBOL(class_put_type);
150
151 #define CLASS_MAX_NAME 1024
152
153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154                         struct lprocfs_vars *vars, const char *name,
155                         struct lu_device_type *ldt)
156 {
157         struct obd_type *type;
158         int rc = 0;
159         ENTRY;
160
161         /* sanity check */
162         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
163
164         if (class_search_type(name)) {
165                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166                 RETURN(-EEXIST);
167         }
168
169         rc = -ENOMEM;
170         OBD_ALLOC(type, sizeof(*type));
171         if (type == NULL)
172                 RETURN(rc);
173
174         OBD_ALLOC_PTR(type->typ_dt_ops);
175         OBD_ALLOC_PTR(type->typ_md_ops);
176         OBD_ALLOC(type->typ_name, strlen(name) + 1);
177
178         if (type->typ_dt_ops == NULL ||
179             type->typ_md_ops == NULL ||
180             type->typ_name == NULL)
181                 GOTO (failed, rc);
182
183         *(type->typ_dt_ops) = *dt_ops;
184         /* md_ops is optional */
185         if (md_ops)
186                 *(type->typ_md_ops) = *md_ops;
187         strcpy(type->typ_name, name);
188         cfs_spin_lock_init(&type->obd_type_lock);
189
190 #ifdef LPROCFS
191         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
192                                               vars, type);
193         if (IS_ERR(type->typ_procroot)) {
194                 rc = PTR_ERR(type->typ_procroot);
195                 type->typ_procroot = NULL;
196                 GOTO (failed, rc);
197         }
198 #endif
199         if (ldt != NULL) {
200                 type->typ_lu = ldt;
201                 rc = lu_device_type_init(ldt);
202                 if (rc != 0)
203                         GOTO (failed, rc);
204         }
205
206         cfs_spin_lock(&obd_types_lock);
207         cfs_list_add(&type->typ_chain, &obd_types);
208         cfs_spin_unlock(&obd_types_lock);
209
210         RETURN (0);
211
212  failed:
213         if (type->typ_name != NULL)
214                 OBD_FREE(type->typ_name, strlen(name) + 1);
215         if (type->typ_md_ops != NULL)
216                 OBD_FREE_PTR(type->typ_md_ops);
217         if (type->typ_dt_ops != NULL)
218                 OBD_FREE_PTR(type->typ_dt_ops);
219         OBD_FREE(type, sizeof(*type));
220         RETURN(rc);
221 }
222 EXPORT_SYMBOL(class_register_type);
223
224 int class_unregister_type(const char *name)
225 {
226         struct obd_type *type = class_search_type(name);
227         ENTRY;
228
229         if (!type) {
230                 CERROR("unknown obd type\n");
231                 RETURN(-EINVAL);
232         }
233
234         if (type->typ_refcnt) {
235                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236                 /* This is a bad situation, let's make the best of it */
237                 /* Remove ops, but leave the name for debugging */
238                 OBD_FREE_PTR(type->typ_dt_ops);
239                 OBD_FREE_PTR(type->typ_md_ops);
240                 RETURN(-EBUSY);
241         }
242
243         if (type->typ_procroot) {
244                 lprocfs_remove(&type->typ_procroot);
245         }
246
247         if (type->typ_lu)
248                 lu_device_type_fini(type->typ_lu);
249
250         cfs_spin_lock(&obd_types_lock);
251         cfs_list_del(&type->typ_chain);
252         cfs_spin_unlock(&obd_types_lock);
253         OBD_FREE(type->typ_name, strlen(name) + 1);
254         if (type->typ_dt_ops != NULL)
255                 OBD_FREE_PTR(type->typ_dt_ops);
256         if (type->typ_md_ops != NULL)
257                 OBD_FREE_PTR(type->typ_md_ops);
258         OBD_FREE(type, sizeof(*type));
259         RETURN(0);
260 } /* class_unregister_type */
261 EXPORT_SYMBOL(class_unregister_type);
262
263 /**
264  * Create a new obd device.
265  *
266  * Find an empty slot in ::obd_devs[], create a new obd device in it.
267  *
268  * \param[in] type_name obd device type string.
269  * \param[in] name      obd device name.
270  *
271  * \retval NULL if create fails, otherwise return the obd device
272  *         pointer created.
273  */
274 struct obd_device *class_newdev(const char *type_name, const char *name)
275 {
276         struct obd_device *result = NULL;
277         struct obd_device *newdev;
278         struct obd_type *type = NULL;
279         int i;
280         int new_obd_minor = 0;
281         ENTRY;
282
283         if (strlen(name) >= MAX_OBD_NAME) {
284                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
285                 RETURN(ERR_PTR(-EINVAL));
286         }
287
288         type = class_get_type(type_name);
289         if (type == NULL){
290                 CERROR("OBD: unknown type: %s\n", type_name);
291                 RETURN(ERR_PTR(-ENODEV));
292         }
293
294         newdev = obd_device_alloc();
295         if (newdev == NULL) {
296                 class_put_type(type);
297                 RETURN(ERR_PTR(-ENOMEM));
298         }
299         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
300
301         cfs_write_lock(&obd_dev_lock);
302         for (i = 0; i < class_devno_max(); i++) {
303                 struct obd_device *obd = class_num2obd(i);
304
305                 if (obd && obd->obd_name &&
306                     (strcmp(name, obd->obd_name) == 0)) {
307                         CERROR("Device %s already exists at %d, won't add\n",
308                                name, i);
309                         if (result) {
310                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
311                                          "%p obd_magic %08x != %08x\n", result,
312                                          result->obd_magic, OBD_DEVICE_MAGIC);
313                                 LASSERTF(result->obd_minor == new_obd_minor,
314                                          "%p obd_minor %d != %d\n", result,
315                                          result->obd_minor, new_obd_minor);
316
317                                 obd_devs[result->obd_minor] = NULL;
318                                 result->obd_name[0]='\0';
319                          }
320                         result = ERR_PTR(-EEXIST);
321                         break;
322                 }
323                 if (!result && !obd) {
324                         result = newdev;
325                         result->obd_minor = i;
326                         new_obd_minor = i;
327                         result->obd_type = type;
328                         strncpy(result->obd_name, name,
329                                 sizeof(result->obd_name) - 1);
330                         obd_devs[i] = result;
331                 }
332         }
333         cfs_write_unlock(&obd_dev_lock);
334
335         if (result == NULL && i >= class_devno_max()) {
336                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
337                        class_devno_max());
338                 RETURN(ERR_PTR(-EOVERFLOW));
339         }
340
341         if (IS_ERR(result)) {
342                 obd_device_free(newdev);
343                 class_put_type(type);
344         } else {
345                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
346                        result->obd_name, result);
347         }
348         RETURN(result);
349 }
350
351 void class_release_dev(struct obd_device *obd)
352 {
353         struct obd_type *obd_type = obd->obd_type;
354
355         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
356                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
357         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
358                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
359         LASSERT(obd_type != NULL);
360
361         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
362                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
363
364         cfs_write_lock(&obd_dev_lock);
365         obd_devs[obd->obd_minor] = NULL;
366         cfs_write_unlock(&obd_dev_lock);
367         obd_device_free(obd);
368
369         class_put_type(obd_type);
370 }
371
372 int class_name2dev(const char *name)
373 {
374         int i;
375
376         if (!name)
377                 return -1;
378
379         cfs_read_lock(&obd_dev_lock);
380         for (i = 0; i < class_devno_max(); i++) {
381                 struct obd_device *obd = class_num2obd(i);
382
383                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
384                         /* Make sure we finished attaching before we give
385                            out any references */
386                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
387                         if (obd->obd_attached) {
388                                 cfs_read_unlock(&obd_dev_lock);
389                                 return i;
390                         }
391                         break;
392                 }
393         }
394         cfs_read_unlock(&obd_dev_lock);
395
396         return -1;
397 }
398 EXPORT_SYMBOL(class_name2dev);
399
400 struct obd_device *class_name2obd(const char *name)
401 {
402         int dev = class_name2dev(name);
403
404         if (dev < 0 || dev > class_devno_max())
405                 return NULL;
406         return class_num2obd(dev);
407 }
408 EXPORT_SYMBOL(class_name2obd);
409
410 int class_uuid2dev(struct obd_uuid *uuid)
411 {
412         int i;
413
414         cfs_read_lock(&obd_dev_lock);
415         for (i = 0; i < class_devno_max(); i++) {
416                 struct obd_device *obd = class_num2obd(i);
417
418                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
419                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
420                         cfs_read_unlock(&obd_dev_lock);
421                         return i;
422                 }
423         }
424         cfs_read_unlock(&obd_dev_lock);
425
426         return -1;
427 }
428 EXPORT_SYMBOL(class_uuid2dev);
429
430 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
431 {
432         int dev = class_uuid2dev(uuid);
433         if (dev < 0)
434                 return NULL;
435         return class_num2obd(dev);
436 }
437 EXPORT_SYMBOL(class_uuid2obd);
438
439 /**
440  * Get obd device from ::obd_devs[]
441  *
442  * \param num [in] array index
443  *
444  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
445  *         otherwise return the obd device there.
446  */
447 struct obd_device *class_num2obd(int num)
448 {
449         struct obd_device *obd = NULL;
450
451         if (num < class_devno_max()) {
452                 obd = obd_devs[num];
453                 if (obd == NULL)
454                         return NULL;
455
456                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
457                          "%p obd_magic %08x != %08x\n",
458                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459                 LASSERTF(obd->obd_minor == num,
460                          "%p obd_minor %0d != %0d\n",
461                          obd, obd->obd_minor, num);
462         }
463
464         return obd;
465 }
466 EXPORT_SYMBOL(class_num2obd);
467
468 void class_obd_list(void)
469 {
470         char *status;
471         int i;
472
473         cfs_read_lock(&obd_dev_lock);
474         for (i = 0; i < class_devno_max(); i++) {
475                 struct obd_device *obd = class_num2obd(i);
476
477                 if (obd == NULL)
478                         continue;
479                 if (obd->obd_stopping)
480                         status = "ST";
481                 else if (obd->obd_set_up)
482                         status = "UP";
483                 else if (obd->obd_attached)
484                         status = "AT";
485                 else
486                         status = "--";
487                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
488                          i, status, obd->obd_type->typ_name,
489                          obd->obd_name, obd->obd_uuid.uuid,
490                          cfs_atomic_read(&obd->obd_refcount));
491         }
492         cfs_read_unlock(&obd_dev_lock);
493         return;
494 }
495
496 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
497    specified, then only the client with that uuid is returned,
498    otherwise any client connected to the tgt is returned. */
499 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
500                                           const char * typ_name,
501                                           struct obd_uuid *grp_uuid)
502 {
503         int i;
504
505         cfs_read_lock(&obd_dev_lock);
506         for (i = 0; i < class_devno_max(); i++) {
507                 struct obd_device *obd = class_num2obd(i);
508
509                 if (obd == NULL)
510                         continue;
511                 if ((strncmp(obd->obd_type->typ_name, typ_name,
512                              strlen(typ_name)) == 0)) {
513                         if (obd_uuid_equals(tgt_uuid,
514                                             &obd->u.cli.cl_target_uuid) &&
515                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
516                                                          &obd->obd_uuid) : 1)) {
517                                 cfs_read_unlock(&obd_dev_lock);
518                                 return obd;
519                         }
520                 }
521         }
522         cfs_read_unlock(&obd_dev_lock);
523
524         return NULL;
525 }
526 EXPORT_SYMBOL(class_find_client_obd);
527
528 /* Iterate the obd_device list looking devices have grp_uuid. Start
529    searching at *next, and if a device is found, the next index to look
530    at is saved in *next. If next is NULL, then the first matching device
531    will always be returned. */
532 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
533 {
534         int i;
535
536         if (next == NULL)
537                 i = 0;
538         else if (*next >= 0 && *next < class_devno_max())
539                 i = *next;
540         else
541                 return NULL;
542
543         cfs_read_lock(&obd_dev_lock);
544         for (; i < class_devno_max(); i++) {
545                 struct obd_device *obd = class_num2obd(i);
546
547                 if (obd == NULL)
548                         continue;
549                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
550                         if (next != NULL)
551                                 *next = i+1;
552                         cfs_read_unlock(&obd_dev_lock);
553                         return obd;
554                 }
555         }
556         cfs_read_unlock(&obd_dev_lock);
557
558         return NULL;
559 }
560 EXPORT_SYMBOL(class_devices_in_group);
561
562 /**
563  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
564  * adjust sptlrpc settings accordingly.
565  */
566 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
567 {
568         struct obd_device  *obd;
569         const char         *type;
570         int                 i, rc = 0, rc2;
571
572         LASSERT(namelen > 0);
573
574         cfs_read_lock(&obd_dev_lock);
575         for (i = 0; i < class_devno_max(); i++) {
576                 obd = class_num2obd(i);
577
578                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
579                         continue;
580
581                 /* only notify mdc, osc, mdt, ost */
582                 type = obd->obd_type->typ_name;
583                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
584                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
585                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
586                     strcmp(type, LUSTRE_OST_NAME) != 0)
587                         continue;
588
589                 if (strncmp(obd->obd_name, fsname, namelen))
590                         continue;
591
592                 class_incref(obd, __FUNCTION__, obd);
593                 cfs_read_unlock(&obd_dev_lock);
594                 rc2 = obd_set_info_async(obd->obd_self_export,
595                                          sizeof(KEY_SPTLRPC_CONF),
596                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
597                 rc = rc ? rc : rc2;
598                 class_decref(obd, __FUNCTION__, obd);
599                 cfs_read_lock(&obd_dev_lock);
600         }
601         cfs_read_unlock(&obd_dev_lock);
602         return rc;
603 }
604 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
605
606 void obd_cleanup_caches(void)
607 {
608         int rc;
609
610         ENTRY;
611         if (obd_device_cachep) {
612                 rc = cfs_mem_cache_destroy(obd_device_cachep);
613                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
614                 obd_device_cachep = NULL;
615         }
616         if (obdo_cachep) {
617                 rc = cfs_mem_cache_destroy(obdo_cachep);
618                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
619                 obdo_cachep = NULL;
620         }
621         if (import_cachep) {
622                 rc = cfs_mem_cache_destroy(import_cachep);
623                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
624                 import_cachep = NULL;
625         }
626         if (capa_cachep) {
627                 rc = cfs_mem_cache_destroy(capa_cachep);
628                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
629                 capa_cachep = NULL;
630         }
631         EXIT;
632 }
633
634 int obd_init_caches(void)
635 {
636         ENTRY;
637
638         LASSERT(obd_device_cachep == NULL);
639         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
640                                                  sizeof(struct obd_device),
641                                                  0, 0);
642         if (!obd_device_cachep)
643                 GOTO(out, -ENOMEM);
644
645         LASSERT(obdo_cachep == NULL);
646         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
647                                            0, 0);
648         if (!obdo_cachep)
649                 GOTO(out, -ENOMEM);
650
651         LASSERT(import_cachep == NULL);
652         import_cachep = cfs_mem_cache_create("ll_import_cache",
653                                              sizeof(struct obd_import),
654                                              0, 0);
655         if (!import_cachep)
656                 GOTO(out, -ENOMEM);
657
658         LASSERT(capa_cachep == NULL);
659         capa_cachep = cfs_mem_cache_create("capa_cache",
660                                            sizeof(struct obd_capa), 0, 0);
661         if (!capa_cachep)
662                 GOTO(out, -ENOMEM);
663
664         RETURN(0);
665  out:
666         obd_cleanup_caches();
667         RETURN(-ENOMEM);
668
669 }
670
671 /* map connection to client */
672 struct obd_export *class_conn2export(struct lustre_handle *conn)
673 {
674         struct obd_export *export;
675         ENTRY;
676
677         if (!conn) {
678                 CDEBUG(D_CACHE, "looking for null handle\n");
679                 RETURN(NULL);
680         }
681
682         if (conn->cookie == -1) {  /* this means assign a new connection */
683                 CDEBUG(D_CACHE, "want a new connection\n");
684                 RETURN(NULL);
685         }
686
687         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
688         export = class_handle2object(conn->cookie);
689         RETURN(export);
690 }
691 EXPORT_SYMBOL(class_conn2export);
692
693 struct obd_device *class_exp2obd(struct obd_export *exp)
694 {
695         if (exp)
696                 return exp->exp_obd;
697         return NULL;
698 }
699 EXPORT_SYMBOL(class_exp2obd);
700
701 struct obd_device *class_conn2obd(struct lustre_handle *conn)
702 {
703         struct obd_export *export;
704         export = class_conn2export(conn);
705         if (export) {
706                 struct obd_device *obd = export->exp_obd;
707                 class_export_put(export);
708                 return obd;
709         }
710         return NULL;
711 }
712 EXPORT_SYMBOL(class_conn2obd);
713
714 struct obd_import *class_exp2cliimp(struct obd_export *exp)
715 {
716         struct obd_device *obd = exp->exp_obd;
717         if (obd == NULL)
718                 return NULL;
719         return obd->u.cli.cl_import;
720 }
721 EXPORT_SYMBOL(class_exp2cliimp);
722
723 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
724 {
725         struct obd_device *obd = class_conn2obd(conn);
726         if (obd == NULL)
727                 return NULL;
728         return obd->u.cli.cl_import;
729 }
730 EXPORT_SYMBOL(class_conn2cliimp);
731
732 /* Export management functions */
733
734 /* if export is involved in recovery then clean up related things */
735 void class_export_recovery_cleanup(struct obd_export *exp)
736 {
737         struct obd_device *obd = exp->exp_obd;
738
739         cfs_spin_lock(&obd->obd_recovery_task_lock);
740         if (exp->exp_delayed)
741                 obd->obd_delayed_clients--;
742         if (obd->obd_recovering && exp->exp_in_recovery) {
743                 cfs_spin_lock(&exp->exp_lock);
744                 exp->exp_in_recovery = 0;
745                 cfs_spin_unlock(&exp->exp_lock);
746                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
747                 cfs_atomic_dec(&obd->obd_connected_clients);
748         }
749         cfs_spin_unlock(&obd->obd_recovery_task_lock);
750         /** Cleanup req replay fields */
751         if (exp->exp_req_replay_needed) {
752                 cfs_spin_lock(&exp->exp_lock);
753                 exp->exp_req_replay_needed = 0;
754                 cfs_spin_unlock(&exp->exp_lock);
755                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
756                 cfs_atomic_dec(&obd->obd_req_replay_clients);
757         }
758         /** Cleanup lock replay data */
759         if (exp->exp_lock_replay_needed) {
760                 cfs_spin_lock(&exp->exp_lock);
761                 exp->exp_lock_replay_needed = 0;
762                 cfs_spin_unlock(&exp->exp_lock);
763                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
764                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
765         }
766 }
767
768 static void class_export_destroy(struct obd_export *exp)
769 {
770         struct obd_device *obd = exp->exp_obd;
771         ENTRY;
772
773         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774         LASSERT(obd != NULL);
775
776         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777                exp->exp_client_uuid.uuid, obd->obd_name);
778
779
780         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
781         if (exp->exp_connection)
782                 ptlrpc_put_connection_superhack(exp->exp_connection);
783
784         LASSERT(cfs_list_empty(&exp->exp_flock_wait_list));
785         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
786         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
787         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
788         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
789         obd_destroy_export(exp);
790         class_decref(obd, "export", exp);
791
792         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
793         EXIT;
794 }
795
796 static void export_handle_addref(void *export)
797 {
798         class_export_get(export);
799 }
800
801 struct obd_export *class_export_get(struct obd_export *exp)
802 {
803         cfs_atomic_inc(&exp->exp_refcount);
804         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
805                cfs_atomic_read(&exp->exp_refcount));
806         return exp;
807 }
808 EXPORT_SYMBOL(class_export_get);
809
810 void class_export_put(struct obd_export *exp)
811 {
812         LASSERT(exp != NULL);
813         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
814         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
815                cfs_atomic_read(&exp->exp_refcount) - 1);
816
817         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
818                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
819                 CDEBUG(D_IOCTL, "final put %p/%s\n",
820                        exp, exp->exp_client_uuid.uuid);
821
822                 /* release nid stat refererence */
823                 lprocfs_exp_cleanup(exp);
824                 class_export_recovery_cleanup(exp);
825
826                 obd_zombie_export_add(exp);
827         }
828 }
829 EXPORT_SYMBOL(class_export_put);
830
831 /* Creates a new export, adds it to the hash table, and returns a
832  * pointer to it. The refcount is 2: one for the hash reference, and
833  * one for the pointer returned by this function. */
834 struct obd_export *class_new_export(struct obd_device *obd,
835                                     struct obd_uuid *cluuid)
836 {
837         struct obd_export *export;
838         cfs_hash_t *hash = NULL;
839         int rc = 0;
840         ENTRY;
841
842         OBD_ALLOC_PTR(export);
843         if (!export)
844                 return ERR_PTR(-ENOMEM);
845
846         export->exp_conn_cnt = 0;
847         export->exp_lock_hash = NULL;
848         cfs_atomic_set(&export->exp_refcount, 2);
849         cfs_atomic_set(&export->exp_rpc_count, 0);
850         cfs_atomic_set(&export->exp_cb_count, 0);
851         cfs_atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
854         cfs_spin_lock_init(&export->exp_locks_list_guard);
855 #endif
856         cfs_atomic_set(&export->exp_replay_count, 0);
857         export->exp_obd = obd;
858         CFS_INIT_LIST_HEAD(&export->exp_flock_wait_list);
859         cfs_rwlock_init(&export->exp_flock_wait_lock);
860         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
861         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
862         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
863         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
864         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
865         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
866         class_handle_hash(&export->exp_handle, export_handle_addref);
867         export->exp_last_request_time = cfs_time_current_sec();
868         cfs_spin_lock_init(&export->exp_lock);
869         cfs_spin_lock_init(&export->exp_rpc_lock);
870         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
871         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
872         cfs_spin_lock_init(&export->exp_bl_list_lock);
873         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
874
875         export->exp_sp_peer = LUSTRE_SP_ANY;
876         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
877         export->exp_client_uuid = *cluuid;
878         obd_init_export(export);
879
880         cfs_spin_lock(&obd->obd_dev_lock);
881          /* shouldn't happen, but might race */
882         if (obd->obd_stopping)
883                 GOTO(exit_unlock, rc = -ENODEV);
884
885         hash = cfs_hash_getref(obd->obd_uuid_hash);
886         if (hash == NULL)
887                 GOTO(exit_unlock, rc = -ENODEV);
888         cfs_spin_unlock(&obd->obd_dev_lock);
889
890         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
891                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
892                 if (rc != 0) {
893                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
894                                       obd->obd_name, cluuid->uuid, rc);
895                         GOTO(exit_err, rc = -EALREADY);
896                 }
897         }
898
899         cfs_spin_lock(&obd->obd_dev_lock);
900         if (obd->obd_stopping) {
901                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
902                 GOTO(exit_unlock, rc = -ENODEV);
903         }
904
905         class_incref(obd, "export", export);
906         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
907         cfs_list_add_tail(&export->exp_obd_chain_timed,
908                           &export->exp_obd->obd_exports_timed);
909         export->exp_obd->obd_num_exports++;
910         cfs_spin_unlock(&obd->obd_dev_lock);
911         cfs_hash_putref(hash);
912         RETURN(export);
913
914 exit_unlock:
915         cfs_spin_unlock(&obd->obd_dev_lock);
916 exit_err:
917         if (hash)
918                 cfs_hash_putref(hash);
919         class_handle_unhash(&export->exp_handle);
920         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
921         obd_destroy_export(export);
922         OBD_FREE_PTR(export);
923         return ERR_PTR(rc);
924 }
925 EXPORT_SYMBOL(class_new_export);
926
927 void class_unlink_export(struct obd_export *exp)
928 {
929         class_handle_unhash(&exp->exp_handle);
930
931         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
932         /* delete an uuid-export hashitem from hashtables */
933         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
934                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
935                              &exp->exp_client_uuid,
936                              &exp->exp_uuid_hash);
937
938         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
939         cfs_list_del_init(&exp->exp_obd_chain_timed);
940         exp->exp_obd->obd_num_exports--;
941         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
942         class_export_put(exp);
943 }
944 EXPORT_SYMBOL(class_unlink_export);
945
946 /* Import management functions */
947 void class_import_destroy(struct obd_import *imp)
948 {
949         ENTRY;
950
951         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
952                 imp->imp_obd->obd_name);
953
954         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
955
956         ptlrpc_put_connection_superhack(imp->imp_connection);
957
958         while (!cfs_list_empty(&imp->imp_conn_list)) {
959                 struct obd_import_conn *imp_conn;
960
961                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
962                                           struct obd_import_conn, oic_item);
963                 cfs_list_del_init(&imp_conn->oic_item);
964                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
965                 OBD_FREE(imp_conn, sizeof(*imp_conn));
966         }
967
968         LASSERT(imp->imp_sec == NULL);
969         class_decref(imp->imp_obd, "import", imp);
970         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
971         EXIT;
972 }
973
974 static void import_handle_addref(void *import)
975 {
976         class_import_get(import);
977 }
978
979 struct obd_import *class_import_get(struct obd_import *import)
980 {
981         cfs_atomic_inc(&import->imp_refcount);
982         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
983                cfs_atomic_read(&import->imp_refcount),
984                import->imp_obd->obd_name);
985         return import;
986 }
987 EXPORT_SYMBOL(class_import_get);
988
989 void class_import_put(struct obd_import *imp)
990 {
991         ENTRY;
992
993         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
994         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
995
996         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
997                cfs_atomic_read(&imp->imp_refcount) - 1,
998                imp->imp_obd->obd_name);
999
1000         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1001                 CDEBUG(D_INFO, "final put import %p\n", imp);
1002                 obd_zombie_import_add(imp);
1003         }
1004
1005         EXIT;
1006 }
1007 EXPORT_SYMBOL(class_import_put);
1008
1009 static void init_imp_at(struct imp_at *at) {
1010         int i;
1011         at_init(&at->iat_net_latency, 0, 0);
1012         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1013                 /* max service estimates are tracked on the server side, so
1014                    don't use the AT history here, just use the last reported
1015                    val. (But keep hist for proc histogram, worst_ever) */
1016                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1017                         AT_FLG_NOHIST);
1018         }
1019 }
1020
1021 struct obd_import *class_new_import(struct obd_device *obd)
1022 {
1023         struct obd_import *imp;
1024
1025         OBD_ALLOC(imp, sizeof(*imp));
1026         if (imp == NULL)
1027                 return NULL;
1028
1029         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1030         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1031         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1032         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1033         cfs_spin_lock_init(&imp->imp_lock);
1034         imp->imp_last_success_conn = 0;
1035         imp->imp_state = LUSTRE_IMP_NEW;
1036         imp->imp_obd = class_incref(obd, "import", imp);
1037         cfs_mutex_init(&imp->imp_sec_mutex);
1038         cfs_waitq_init(&imp->imp_recovery_waitq);
1039
1040         cfs_atomic_set(&imp->imp_refcount, 2);
1041         cfs_atomic_set(&imp->imp_unregistering, 0);
1042         cfs_atomic_set(&imp->imp_inflight, 0);
1043         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1044         cfs_atomic_set(&imp->imp_inval_count, 0);
1045         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1046         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1047         class_handle_hash(&imp->imp_handle, import_handle_addref);
1048         init_imp_at(&imp->imp_at);
1049
1050         /* the default magic is V2, will be used in connect RPC, and
1051          * then adjusted according to the flags in request/reply. */
1052         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1053
1054         return imp;
1055 }
1056 EXPORT_SYMBOL(class_new_import);
1057
1058 void class_destroy_import(struct obd_import *import)
1059 {
1060         LASSERT(import != NULL);
1061         LASSERT(import != LP_POISON);
1062
1063         class_handle_unhash(&import->imp_handle);
1064
1065         cfs_spin_lock(&import->imp_lock);
1066         import->imp_generation++;
1067         cfs_spin_unlock(&import->imp_lock);
1068         class_import_put(import);
1069 }
1070 EXPORT_SYMBOL(class_destroy_import);
1071
1072 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1073
1074 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1075 {
1076         cfs_spin_lock(&exp->exp_locks_list_guard);
1077
1078         LASSERT(lock->l_exp_refs_nr >= 0);
1079
1080         if (lock->l_exp_refs_target != NULL &&
1081             lock->l_exp_refs_target != exp) {
1082                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1083                               exp, lock, lock->l_exp_refs_target);
1084         }
1085         if ((lock->l_exp_refs_nr ++) == 0) {
1086                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1087                 lock->l_exp_refs_target = exp;
1088         }
1089         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1090                lock, exp, lock->l_exp_refs_nr);
1091         cfs_spin_unlock(&exp->exp_locks_list_guard);
1092 }
1093 EXPORT_SYMBOL(__class_export_add_lock_ref);
1094
1095 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1096 {
1097         cfs_spin_lock(&exp->exp_locks_list_guard);
1098         LASSERT(lock->l_exp_refs_nr > 0);
1099         if (lock->l_exp_refs_target != exp) {
1100                 LCONSOLE_WARN("lock %p, "
1101                               "mismatching export pointers: %p, %p\n",
1102                               lock, lock->l_exp_refs_target, exp);
1103         }
1104         if (-- lock->l_exp_refs_nr == 0) {
1105                 cfs_list_del_init(&lock->l_exp_refs_link);
1106                 lock->l_exp_refs_target = NULL;
1107         }
1108         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1109                lock, exp, lock->l_exp_refs_nr);
1110         cfs_spin_unlock(&exp->exp_locks_list_guard);
1111 }
1112 EXPORT_SYMBOL(__class_export_del_lock_ref);
1113 #endif
1114
1115 /* A connection defines an export context in which preallocation can
1116    be managed. This releases the export pointer reference, and returns
1117    the export handle, so the export refcount is 1 when this function
1118    returns. */
1119 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1120                   struct obd_uuid *cluuid)
1121 {
1122         struct obd_export *export;
1123         LASSERT(conn != NULL);
1124         LASSERT(obd != NULL);
1125         LASSERT(cluuid != NULL);
1126         ENTRY;
1127
1128         export = class_new_export(obd, cluuid);
1129         if (IS_ERR(export))
1130                 RETURN(PTR_ERR(export));
1131
1132         conn->cookie = export->exp_handle.h_cookie;
1133         class_export_put(export);
1134
1135         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1136                cluuid->uuid, conn->cookie);
1137         RETURN(0);
1138 }
1139 EXPORT_SYMBOL(class_connect);
1140
1141
1142 /* This function removes 1-3 references from the export:
1143  * 1 - for export pointer passed
1144  * and if disconnect really need
1145  * 2 - removing from hash
1146  * 3 - in client_unlink_export
1147  * The export pointer passed to this function can destroyed */
1148 int class_disconnect(struct obd_export *export)
1149 {
1150         int already_disconnected;
1151         ENTRY;
1152
1153         if (export == NULL) {
1154                 CWARN("attempting to free NULL export %p\n", export);
1155                 RETURN(-EINVAL);
1156         }
1157
1158         cfs_spin_lock(&export->exp_lock);
1159         already_disconnected = export->exp_disconnected;
1160         export->exp_disconnected = 1;
1161         cfs_spin_unlock(&export->exp_lock);
1162
1163         /* class_cleanup(), abort_recovery(), and class_fail_export()
1164          * all end up in here, and if any of them race we shouldn't
1165          * call extra class_export_puts(). */
1166         if (already_disconnected) {
1167                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1168                 GOTO(no_disconn, already_disconnected);
1169         }
1170
1171         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1172                export->exp_handle.h_cookie);
1173
1174         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1175                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1176                              &export->exp_connection->c_peer.nid,
1177                              &export->exp_nid_hash);
1178
1179         class_unlink_export(export);
1180 no_disconn:
1181         class_export_put(export);
1182         RETURN(0);
1183 }
1184 EXPORT_SYMBOL(class_disconnect);
1185
1186 /* Return non-zero for a fully connected export */
1187 int class_connected_export(struct obd_export *exp)
1188 {
1189         if (exp) {
1190                 int connected;
1191                 cfs_spin_lock(&exp->exp_lock);
1192                 connected = (exp->exp_conn_cnt > 0);
1193                 cfs_spin_unlock(&exp->exp_lock);
1194                 return connected;
1195         }
1196         return 0;
1197 }
1198 EXPORT_SYMBOL(class_connected_export);
1199
1200 static void class_disconnect_export_list(cfs_list_t *list,
1201                                          enum obd_option flags)
1202 {
1203         int rc;
1204         struct obd_export *exp;
1205         ENTRY;
1206
1207         /* It's possible that an export may disconnect itself, but
1208          * nothing else will be added to this list. */
1209         while (!cfs_list_empty(list)) {
1210                 exp = cfs_list_entry(list->next, struct obd_export,
1211                                      exp_obd_chain);
1212                 /* need for safe call CDEBUG after obd_disconnect */
1213                 class_export_get(exp);
1214
1215                 cfs_spin_lock(&exp->exp_lock);
1216                 exp->exp_flags = flags;
1217                 cfs_spin_unlock(&exp->exp_lock);
1218
1219                 if (obd_uuid_equals(&exp->exp_client_uuid,
1220                                     &exp->exp_obd->obd_uuid)) {
1221                         CDEBUG(D_HA,
1222                                "exp %p export uuid == obd uuid, don't discon\n",
1223                                exp);
1224                         /* Need to delete this now so we don't end up pointing
1225                          * to work_list later when this export is cleaned up. */
1226                         cfs_list_del_init(&exp->exp_obd_chain);
1227                         class_export_put(exp);
1228                         continue;
1229                 }
1230
1231                 class_export_get(exp);
1232                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1233                        "last request at "CFS_TIME_T"\n",
1234                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1235                        exp, exp->exp_last_request_time);
1236                 /* release one export reference anyway */
1237                 rc = obd_disconnect(exp);
1238
1239                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1240                        obd_export_nid2str(exp), exp, rc);
1241                 class_export_put(exp);
1242         }
1243         EXIT;
1244 }
1245
1246 void class_disconnect_exports(struct obd_device *obd)
1247 {
1248         cfs_list_t work_list;
1249         ENTRY;
1250
1251         /* Move all of the exports from obd_exports to a work list, en masse. */
1252         CFS_INIT_LIST_HEAD(&work_list);
1253         cfs_spin_lock(&obd->obd_dev_lock);
1254         cfs_list_splice_init(&obd->obd_exports, &work_list);
1255         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1256         cfs_spin_unlock(&obd->obd_dev_lock);
1257
1258         if (!cfs_list_empty(&work_list)) {
1259                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1260                        "disconnecting them\n", obd->obd_minor, obd);
1261                 class_disconnect_export_list(&work_list,
1262                                              exp_flags_from_obd(obd));
1263         } else
1264                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1265                        obd->obd_minor, obd);
1266         EXIT;
1267 }
1268 EXPORT_SYMBOL(class_disconnect_exports);
1269
1270 /* Remove exports that have not completed recovery.
1271  */
1272 void class_disconnect_stale_exports(struct obd_device *obd,
1273                                     int (*test_export)(struct obd_export *))
1274 {
1275         cfs_list_t work_list;
1276         cfs_list_t *pos, *n;
1277         struct obd_export *exp;
1278         int evicted = 0;
1279         ENTRY;
1280
1281         CFS_INIT_LIST_HEAD(&work_list);
1282         cfs_spin_lock(&obd->obd_dev_lock);
1283         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1284                 int failed;
1285
1286                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1287
1288                 /* don't count self-export as client */
1289                 if (obd_uuid_equals(&exp->exp_client_uuid,
1290                                     &exp->exp_obd->obd_uuid))
1291                         continue;
1292
1293                 if (test_export(exp))
1294                         continue;
1295
1296                 cfs_spin_lock(&exp->exp_lock);
1297                 failed = exp->exp_failed;
1298                 exp->exp_failed = 1;
1299                 cfs_spin_unlock(&exp->exp_lock);
1300                 if (failed)
1301                         continue;
1302
1303                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1304                 evicted++;
1305                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1306                        obd->obd_name, exp->exp_client_uuid.uuid,
1307                        exp->exp_connection == NULL ? "<unknown>" :
1308                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1309                 print_export_data(exp, "EVICTING", 0);
1310         }
1311         cfs_spin_unlock(&obd->obd_dev_lock);
1312
1313         if (evicted) {
1314                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1315                        obd->obd_name, evicted);
1316                 obd->obd_stale_clients += evicted;
1317         }
1318         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1319                                                  OBD_OPT_ABORT_RECOV);
1320         EXIT;
1321 }
1322 EXPORT_SYMBOL(class_disconnect_stale_exports);
1323
1324 void class_fail_export(struct obd_export *exp)
1325 {
1326         int rc, already_failed;
1327
1328         cfs_spin_lock(&exp->exp_lock);
1329         already_failed = exp->exp_failed;
1330         exp->exp_failed = 1;
1331         cfs_spin_unlock(&exp->exp_lock);
1332
1333         if (already_failed) {
1334                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1335                        exp, exp->exp_client_uuid.uuid);
1336                 return;
1337         }
1338
1339         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1340                exp, exp->exp_client_uuid.uuid);
1341
1342         if (obd_dump_on_timeout)
1343                 libcfs_debug_dumplog();
1344
1345         /* need for safe call CDEBUG after obd_disconnect */
1346         class_export_get(exp);
1347
1348         /* Most callers into obd_disconnect are removing their own reference
1349          * (request, for example) in addition to the one from the hash table.
1350          * We don't have such a reference here, so make one. */
1351         class_export_get(exp);
1352         rc = obd_disconnect(exp);
1353         if (rc)
1354                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1355         else
1356                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1357                        exp, exp->exp_client_uuid.uuid);
1358         class_export_put(exp);
1359 }
1360 EXPORT_SYMBOL(class_fail_export);
1361
1362 char *obd_export_nid2str(struct obd_export *exp)
1363 {
1364         if (exp->exp_connection != NULL)
1365                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1366
1367         return "(no nid)";
1368 }
1369 EXPORT_SYMBOL(obd_export_nid2str);
1370
1371 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1372 {
1373         struct obd_export *doomed_exp = NULL;
1374         int exports_evicted = 0;
1375
1376         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1377
1378         do {
1379                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1380                 if (doomed_exp == NULL)
1381                         break;
1382
1383                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1384                          "nid %s found, wanted nid %s, requested nid %s\n",
1385                          obd_export_nid2str(doomed_exp),
1386                          libcfs_nid2str(nid_key), nid);
1387                 LASSERTF(doomed_exp != obd->obd_self_export,
1388                          "self-export is hashed by NID?\n");
1389                 exports_evicted++;
1390                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1391                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1392                        exports_evicted);
1393                 class_fail_export(doomed_exp);
1394                 class_export_put(doomed_exp);
1395         } while (1);
1396
1397         if (!exports_evicted)
1398                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1399                        obd->obd_name, nid);
1400         return exports_evicted;
1401 }
1402 EXPORT_SYMBOL(obd_export_evict_by_nid);
1403
1404 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1405 {
1406         struct obd_export *doomed_exp = NULL;
1407         struct obd_uuid doomed_uuid;
1408         int exports_evicted = 0;
1409
1410         obd_str2uuid(&doomed_uuid, uuid);
1411         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1412                 CERROR("%s: can't evict myself\n", obd->obd_name);
1413                 return exports_evicted;
1414         }
1415
1416         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1417
1418         if (doomed_exp == NULL) {
1419                 CERROR("%s: can't disconnect %s: no exports found\n",
1420                        obd->obd_name, uuid);
1421         } else {
1422                 CWARN("%s: evicting %s at adminstrative request\n",
1423                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1424                 class_fail_export(doomed_exp);
1425                 class_export_put(doomed_exp);
1426                 exports_evicted++;
1427         }
1428
1429         return exports_evicted;
1430 }
1431 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1432
1433 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1434 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1435 EXPORT_SYMBOL(class_export_dump_hook);
1436 #endif
1437
1438 static void print_export_data(struct obd_export *exp, const char *status,
1439                               int locks)
1440 {
1441         struct ptlrpc_reply_state *rs;
1442         struct ptlrpc_reply_state *first_reply = NULL;
1443         int nreplies = 0;
1444
1445         cfs_spin_lock(&exp->exp_lock);
1446         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1447                                 rs_exp_list) {
1448                 if (nreplies == 0)
1449                         first_reply = rs;
1450                 nreplies++;
1451         }
1452         cfs_spin_unlock(&exp->exp_lock);
1453
1454         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1455                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1456                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1457                cfs_atomic_read(&exp->exp_rpc_count),
1458                cfs_atomic_read(&exp->exp_cb_count),
1459                cfs_atomic_read(&exp->exp_locks_count),
1460                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1461                nreplies, first_reply, nreplies > 3 ? "..." : "",
1462                exp->exp_last_committed);
1463 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1464         if (locks && class_export_dump_hook != NULL)
1465                 class_export_dump_hook(exp);
1466 #endif
1467 }
1468
1469 void dump_exports(struct obd_device *obd, int locks)
1470 {
1471         struct obd_export *exp;
1472
1473         cfs_spin_lock(&obd->obd_dev_lock);
1474         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1475                 print_export_data(exp, "ACTIVE", locks);
1476         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1477                 print_export_data(exp, "UNLINKED", locks);
1478         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1479                 print_export_data(exp, "DELAYED", locks);
1480         cfs_spin_unlock(&obd->obd_dev_lock);
1481         cfs_spin_lock(&obd_zombie_impexp_lock);
1482         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1483                 print_export_data(exp, "ZOMBIE", locks);
1484         cfs_spin_unlock(&obd_zombie_impexp_lock);
1485 }
1486 EXPORT_SYMBOL(dump_exports);
1487
1488 void obd_exports_barrier(struct obd_device *obd)
1489 {
1490         int waited = 2;
1491         LASSERT(cfs_list_empty(&obd->obd_exports));
1492         cfs_spin_lock(&obd->obd_dev_lock);
1493         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1494                 cfs_spin_unlock(&obd->obd_dev_lock);
1495                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1496                                                    cfs_time_seconds(waited));
1497                 if (waited > 5 && IS_PO2(waited)) {
1498                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1499                                       "more than %d seconds. "
1500                                       "The obd refcount = %d. Is it stuck?\n",
1501                                       obd->obd_name, waited,
1502                                       cfs_atomic_read(&obd->obd_refcount));
1503                         dump_exports(obd, 1);
1504                 }
1505                 waited *= 2;
1506                 cfs_spin_lock(&obd->obd_dev_lock);
1507         }
1508         cfs_spin_unlock(&obd->obd_dev_lock);
1509 }
1510 EXPORT_SYMBOL(obd_exports_barrier);
1511
1512 /* Total amount of zombies to be destroyed */
1513 static int zombies_count = 0;
1514
1515 /**
1516  * kill zombie imports and exports
1517  */
1518 void obd_zombie_impexp_cull(void)
1519 {
1520         struct obd_import *import;
1521         struct obd_export *export;
1522         ENTRY;
1523
1524         do {
1525                 cfs_spin_lock(&obd_zombie_impexp_lock);
1526
1527                 import = NULL;
1528                 if (!cfs_list_empty(&obd_zombie_imports)) {
1529                         import = cfs_list_entry(obd_zombie_imports.next,
1530                                                 struct obd_import,
1531                                                 imp_zombie_chain);
1532                         cfs_list_del_init(&import->imp_zombie_chain);
1533                 }
1534
1535                 export = NULL;
1536                 if (!cfs_list_empty(&obd_zombie_exports)) {
1537                         export = cfs_list_entry(obd_zombie_exports.next,
1538                                                 struct obd_export,
1539                                                 exp_obd_chain);
1540                         cfs_list_del_init(&export->exp_obd_chain);
1541                 }
1542
1543                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1544
1545                 if (import != NULL) {
1546                         class_import_destroy(import);
1547                         cfs_spin_lock(&obd_zombie_impexp_lock);
1548                         zombies_count--;
1549                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1550                 }
1551
1552                 if (export != NULL) {
1553                         class_export_destroy(export);
1554                         cfs_spin_lock(&obd_zombie_impexp_lock);
1555                         zombies_count--;
1556                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1557                 }
1558
1559                 cfs_cond_resched();
1560         } while (import != NULL || export != NULL);
1561         EXIT;
1562 }
1563
1564 static cfs_completion_t         obd_zombie_start;
1565 static cfs_completion_t         obd_zombie_stop;
1566 static unsigned long            obd_zombie_flags;
1567 static cfs_waitq_t              obd_zombie_waitq;
1568 static pid_t                    obd_zombie_pid;
1569
1570 enum {
1571         OBD_ZOMBIE_STOP   = 1 << 1
1572 };
1573
1574 /**
1575  * check for work for kill zombie import/export thread.
1576  */
1577 static int obd_zombie_impexp_check(void *arg)
1578 {
1579         int rc;
1580
1581         cfs_spin_lock(&obd_zombie_impexp_lock);
1582         rc = (zombies_count == 0) &&
1583              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1584         cfs_spin_unlock(&obd_zombie_impexp_lock);
1585
1586         RETURN(rc);
1587 }
1588
1589 /**
1590  * Add export to the obd_zombe thread and notify it.
1591  */
1592 static void obd_zombie_export_add(struct obd_export *exp) {
1593         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1594         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1595         cfs_list_del_init(&exp->exp_obd_chain);
1596         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1597         cfs_spin_lock(&obd_zombie_impexp_lock);
1598         zombies_count++;
1599         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1600         cfs_spin_unlock(&obd_zombie_impexp_lock);
1601
1602         obd_zombie_impexp_notify();
1603 }
1604
1605 /**
1606  * Add import to the obd_zombe thread and notify it.
1607  */
1608 static void obd_zombie_import_add(struct obd_import *imp) {
1609         LASSERT(imp->imp_sec == NULL);
1610         LASSERT(imp->imp_rq_pool == NULL);
1611         cfs_spin_lock(&obd_zombie_impexp_lock);
1612         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1613         zombies_count++;
1614         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1615         cfs_spin_unlock(&obd_zombie_impexp_lock);
1616
1617         obd_zombie_impexp_notify();
1618 }
1619
1620 /**
1621  * notify import/export destroy thread about new zombie.
1622  */
1623 static void obd_zombie_impexp_notify(void)
1624 {
1625         /*
1626          * Make sure obd_zomebie_impexp_thread get this notification.
1627          * It is possible this signal only get by obd_zombie_barrier, and
1628          * barrier gulps this notification and sleeps away and hangs ensues
1629          */
1630         cfs_waitq_broadcast(&obd_zombie_waitq);
1631 }
1632
1633 /**
1634  * check whether obd_zombie is idle
1635  */
1636 static int obd_zombie_is_idle(void)
1637 {
1638         int rc;
1639
1640         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1641         cfs_spin_lock(&obd_zombie_impexp_lock);
1642         rc = (zombies_count == 0);
1643         cfs_spin_unlock(&obd_zombie_impexp_lock);
1644         return rc;
1645 }
1646
1647 /**
1648  * wait when obd_zombie import/export queues become empty
1649  */
1650 void obd_zombie_barrier(void)
1651 {
1652         struct l_wait_info lwi = { 0 };
1653
1654         if (obd_zombie_pid == cfs_curproc_pid())
1655                 /* don't wait for myself */
1656                 return;
1657         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1658 }
1659 EXPORT_SYMBOL(obd_zombie_barrier);
1660
1661 #ifdef __KERNEL__
1662
1663 /**
1664  * destroy zombie export/import thread.
1665  */
1666 static int obd_zombie_impexp_thread(void *unused)
1667 {
1668         int rc;
1669
1670         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1671                 cfs_complete(&obd_zombie_start);
1672                 RETURN(rc);
1673         }
1674
1675         cfs_complete(&obd_zombie_start);
1676
1677         obd_zombie_pid = cfs_curproc_pid();
1678
1679         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1680                 struct l_wait_info lwi = { 0 };
1681
1682                 l_wait_event(obd_zombie_waitq,
1683                              !obd_zombie_impexp_check(NULL), &lwi);
1684                 obd_zombie_impexp_cull();
1685
1686                 /*
1687                  * Notify obd_zombie_barrier callers that queues
1688                  * may be empty.
1689                  */
1690                 cfs_waitq_signal(&obd_zombie_waitq);
1691         }
1692
1693         cfs_complete(&obd_zombie_stop);
1694
1695         RETURN(0);
1696 }
1697
1698 #else /* ! KERNEL */
1699
1700 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1701 static void *obd_zombie_impexp_work_cb;
1702 static void *obd_zombie_impexp_idle_cb;
1703
1704 int obd_zombie_impexp_kill(void *arg)
1705 {
1706         int rc = 0;
1707
1708         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1709                 obd_zombie_impexp_cull();
1710                 rc = 1;
1711         }
1712         cfs_atomic_dec(&zombie_recur);
1713         return rc;
1714 }
1715
1716 #endif
1717
1718 /**
1719  * start destroy zombie import/export thread
1720  */
1721 int obd_zombie_impexp_init(void)
1722 {
1723         int rc;
1724
1725         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1726         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1727         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1728         cfs_init_completion(&obd_zombie_start);
1729         cfs_init_completion(&obd_zombie_stop);
1730         cfs_waitq_init(&obd_zombie_waitq);
1731         obd_zombie_pid = 0;
1732
1733 #ifdef __KERNEL__
1734         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1735         if (rc < 0)
1736                 RETURN(rc);
1737
1738         cfs_wait_for_completion(&obd_zombie_start);
1739 #else
1740
1741         obd_zombie_impexp_work_cb =
1742                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1743                                                  &obd_zombie_impexp_kill, NULL);
1744
1745         obd_zombie_impexp_idle_cb =
1746                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1747                                                  &obd_zombie_impexp_check, NULL);
1748         rc = 0;
1749 #endif
1750         RETURN(rc);
1751 }
1752 /**
1753  * stop destroy zombie import/export thread
1754  */
1755 void obd_zombie_impexp_stop(void)
1756 {
1757         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1758         obd_zombie_impexp_notify();
1759 #ifdef __KERNEL__
1760         cfs_wait_for_completion(&obd_zombie_stop);
1761 #else
1762         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1763         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1764 #endif
1765 }
1766
1767 /***** Kernel-userspace comm helpers *******/
1768
1769 /* Get length of entire message, including header */
1770 int kuc_len(int payload_len)
1771 {
1772         return sizeof(struct kuc_hdr) + payload_len;
1773 }
1774 EXPORT_SYMBOL(kuc_len);
1775
1776 /* Get a pointer to kuc header, given a ptr to the payload
1777  * @param p Pointer to payload area
1778  * @returns Pointer to kuc header
1779  */
1780 struct kuc_hdr * kuc_ptr(void *p)
1781 {
1782         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1783         LASSERT(lh->kuc_magic == KUC_MAGIC);
1784         return lh;
1785 }
1786 EXPORT_SYMBOL(kuc_ptr);
1787
1788 /* Test if payload is part of kuc message
1789  * @param p Pointer to payload area
1790  * @returns boolean
1791  */
1792 int kuc_ispayload(void *p)
1793 {
1794         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1795
1796         if (kh->kuc_magic == KUC_MAGIC)
1797                 return 1;
1798         else
1799                 return 0;
1800 }
1801 EXPORT_SYMBOL(kuc_ispayload);
1802
1803 /* Alloc space for a message, and fill in header
1804  * @return Pointer to payload area
1805  */
1806 void *kuc_alloc(int payload_len, int transport, int type)
1807 {
1808         struct kuc_hdr *lh;
1809         int len = kuc_len(payload_len);
1810
1811         OBD_ALLOC(lh, len);
1812         if (lh == NULL)
1813                 return ERR_PTR(-ENOMEM);
1814
1815         lh->kuc_magic = KUC_MAGIC;
1816         lh->kuc_transport = transport;
1817         lh->kuc_msgtype = type;
1818         lh->kuc_msglen = len;
1819
1820         return (void *)(lh + 1);
1821 }
1822 EXPORT_SYMBOL(kuc_alloc);
1823
1824 /* Takes pointer to payload area */
1825 inline void kuc_free(void *p, int payload_len)
1826 {
1827         struct kuc_hdr *lh = kuc_ptr(p);
1828         OBD_FREE(lh, kuc_len(payload_len));
1829 }
1830 EXPORT_SYMBOL(kuc_free);
1831
1832
1833