Whamcloud - gitweb
LU-1302 llog: structures changes, llog_thread_info
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123                 if (!cfs_request_module("%s", modname)) {
124                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125                         type = class_search_type(name);
126                 } else {
127                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
128                                            modname);
129                 }
130         }
131 #endif
132         if (type) {
133                 cfs_spin_lock(&type->obd_type_lock);
134                 type->typ_refcnt++;
135                 cfs_try_module_get(type->typ_dt_ops->o_owner);
136                 cfs_spin_unlock(&type->obd_type_lock);
137         }
138         return type;
139 }
140 EXPORT_SYMBOL(class_get_type);
141
142 void class_put_type(struct obd_type *type)
143 {
144         LASSERT(type);
145         cfs_spin_lock(&type->obd_type_lock);
146         type->typ_refcnt--;
147         cfs_module_put(type->typ_dt_ops->o_owner);
148         cfs_spin_unlock(&type->obd_type_lock);
149 }
150 EXPORT_SYMBOL(class_put_type);
151
152 #define CLASS_MAX_NAME 1024
153
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155                         struct lprocfs_vars *vars, const char *name,
156                         struct lu_device_type *ldt)
157 {
158         struct obd_type *type;
159         int rc = 0;
160         ENTRY;
161
162         /* sanity check */
163         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
164
165         if (class_search_type(name)) {
166                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
167                 RETURN(-EEXIST);
168         }
169
170         rc = -ENOMEM;
171         OBD_ALLOC(type, sizeof(*type));
172         if (type == NULL)
173                 RETURN(rc);
174
175         OBD_ALLOC_PTR(type->typ_dt_ops);
176         OBD_ALLOC_PTR(type->typ_md_ops);
177         OBD_ALLOC(type->typ_name, strlen(name) + 1);
178
179         if (type->typ_dt_ops == NULL ||
180             type->typ_md_ops == NULL ||
181             type->typ_name == NULL)
182                 GOTO (failed, rc);
183
184         *(type->typ_dt_ops) = *dt_ops;
185         /* md_ops is optional */
186         if (md_ops)
187                 *(type->typ_md_ops) = *md_ops;
188         strcpy(type->typ_name, name);
189         cfs_spin_lock_init(&type->obd_type_lock);
190
191 #ifdef LPROCFS
192         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
193                                               vars, type);
194         if (IS_ERR(type->typ_procroot)) {
195                 rc = PTR_ERR(type->typ_procroot);
196                 type->typ_procroot = NULL;
197                 GOTO (failed, rc);
198         }
199 #endif
200         if (ldt != NULL) {
201                 type->typ_lu = ldt;
202                 rc = lu_device_type_init(ldt);
203                 if (rc != 0)
204                         GOTO (failed, rc);
205         }
206
207         cfs_spin_lock(&obd_types_lock);
208         cfs_list_add(&type->typ_chain, &obd_types);
209         cfs_spin_unlock(&obd_types_lock);
210
211         RETURN (0);
212
213  failed:
214         if (type->typ_name != NULL)
215                 OBD_FREE(type->typ_name, strlen(name) + 1);
216         if (type->typ_md_ops != NULL)
217                 OBD_FREE_PTR(type->typ_md_ops);
218         if (type->typ_dt_ops != NULL)
219                 OBD_FREE_PTR(type->typ_dt_ops);
220         OBD_FREE(type, sizeof(*type));
221         RETURN(rc);
222 }
223 EXPORT_SYMBOL(class_register_type);
224
225 int class_unregister_type(const char *name)
226 {
227         struct obd_type *type = class_search_type(name);
228         ENTRY;
229
230         if (!type) {
231                 CERROR("unknown obd type\n");
232                 RETURN(-EINVAL);
233         }
234
235         if (type->typ_refcnt) {
236                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
237                 /* This is a bad situation, let's make the best of it */
238                 /* Remove ops, but leave the name for debugging */
239                 OBD_FREE_PTR(type->typ_dt_ops);
240                 OBD_FREE_PTR(type->typ_md_ops);
241                 RETURN(-EBUSY);
242         }
243
244         if (type->typ_procroot) {
245                 lprocfs_remove(&type->typ_procroot);
246         }
247
248         if (type->typ_lu)
249                 lu_device_type_fini(type->typ_lu);
250
251         cfs_spin_lock(&obd_types_lock);
252         cfs_list_del(&type->typ_chain);
253         cfs_spin_unlock(&obd_types_lock);
254         OBD_FREE(type->typ_name, strlen(name) + 1);
255         if (type->typ_dt_ops != NULL)
256                 OBD_FREE_PTR(type->typ_dt_ops);
257         if (type->typ_md_ops != NULL)
258                 OBD_FREE_PTR(type->typ_md_ops);
259         OBD_FREE(type, sizeof(*type));
260         RETURN(0);
261 } /* class_unregister_type */
262 EXPORT_SYMBOL(class_unregister_type);
263
264 /**
265  * Create a new obd device.
266  *
267  * Find an empty slot in ::obd_devs[], create a new obd device in it.
268  *
269  * \param[in] type_name obd device type string.
270  * \param[in] name      obd device name.
271  *
272  * \retval NULL if create fails, otherwise return the obd device
273  *         pointer created.
274  */
275 struct obd_device *class_newdev(const char *type_name, const char *name)
276 {
277         struct obd_device *result = NULL;
278         struct obd_device *newdev;
279         struct obd_type *type = NULL;
280         int i;
281         int new_obd_minor = 0;
282         ENTRY;
283
284         if (strlen(name) >= MAX_OBD_NAME) {
285                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286                 RETURN(ERR_PTR(-EINVAL));
287         }
288
289         type = class_get_type(type_name);
290         if (type == NULL){
291                 CERROR("OBD: unknown type: %s\n", type_name);
292                 RETURN(ERR_PTR(-ENODEV));
293         }
294
295         newdev = obd_device_alloc();
296         if (newdev == NULL) {
297                 class_put_type(type);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
301
302         cfs_write_lock(&obd_dev_lock);
303         for (i = 0; i < class_devno_max(); i++) {
304                 struct obd_device *obd = class_num2obd(i);
305
306                 if (obd && obd->obd_name &&
307                     (strcmp(name, obd->obd_name) == 0)) {
308                         CERROR("Device %s already exists at %d, won't add\n",
309                                name, i);
310                         if (result) {
311                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312                                          "%p obd_magic %08x != %08x\n", result,
313                                          result->obd_magic, OBD_DEVICE_MAGIC);
314                                 LASSERTF(result->obd_minor == new_obd_minor,
315                                          "%p obd_minor %d != %d\n", result,
316                                          result->obd_minor, new_obd_minor);
317
318                                 obd_devs[result->obd_minor] = NULL;
319                                 result->obd_name[0]='\0';
320                          }
321                         result = ERR_PTR(-EEXIST);
322                         break;
323                 }
324                 if (!result && !obd) {
325                         result = newdev;
326                         result->obd_minor = i;
327                         new_obd_minor = i;
328                         result->obd_type = type;
329                         strncpy(result->obd_name, name,
330                                 sizeof(result->obd_name) - 1);
331                         obd_devs[i] = result;
332                 }
333         }
334         cfs_write_unlock(&obd_dev_lock);
335
336         if (result == NULL && i >= class_devno_max()) {
337                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338                        class_devno_max());
339                 RETURN(ERR_PTR(-EOVERFLOW));
340         }
341
342         if (IS_ERR(result)) {
343                 obd_device_free(newdev);
344                 class_put_type(type);
345         } else {
346                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347                        result->obd_name, result);
348         }
349         RETURN(result);
350 }
351
352 void class_release_dev(struct obd_device *obd)
353 {
354         struct obd_type *obd_type = obd->obd_type;
355
356         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360         LASSERT(obd_type != NULL);
361
362         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
363                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
364
365         cfs_write_lock(&obd_dev_lock);
366         obd_devs[obd->obd_minor] = NULL;
367         cfs_write_unlock(&obd_dev_lock);
368         obd_device_free(obd);
369
370         class_put_type(obd_type);
371 }
372
373 int class_name2dev(const char *name)
374 {
375         int i;
376
377         if (!name)
378                 return -1;
379
380         cfs_read_lock(&obd_dev_lock);
381         for (i = 0; i < class_devno_max(); i++) {
382                 struct obd_device *obd = class_num2obd(i);
383
384                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385                         /* Make sure we finished attaching before we give
386                            out any references */
387                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388                         if (obd->obd_attached) {
389                                 cfs_read_unlock(&obd_dev_lock);
390                                 return i;
391                         }
392                         break;
393                 }
394         }
395         cfs_read_unlock(&obd_dev_lock);
396
397         return -1;
398 }
399 EXPORT_SYMBOL(class_name2dev);
400
401 struct obd_device *class_name2obd(const char *name)
402 {
403         int dev = class_name2dev(name);
404
405         if (dev < 0 || dev > class_devno_max())
406                 return NULL;
407         return class_num2obd(dev);
408 }
409 EXPORT_SYMBOL(class_name2obd);
410
411 int class_uuid2dev(struct obd_uuid *uuid)
412 {
413         int i;
414
415         cfs_read_lock(&obd_dev_lock);
416         for (i = 0; i < class_devno_max(); i++) {
417                 struct obd_device *obd = class_num2obd(i);
418
419                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421                         cfs_read_unlock(&obd_dev_lock);
422                         return i;
423                 }
424         }
425         cfs_read_unlock(&obd_dev_lock);
426
427         return -1;
428 }
429 EXPORT_SYMBOL(class_uuid2dev);
430
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
432 {
433         int dev = class_uuid2dev(uuid);
434         if (dev < 0)
435                 return NULL;
436         return class_num2obd(dev);
437 }
438 EXPORT_SYMBOL(class_uuid2obd);
439
440 /**
441  * Get obd device from ::obd_devs[]
442  *
443  * \param num [in] array index
444  *
445  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446  *         otherwise return the obd device there.
447  */
448 struct obd_device *class_num2obd(int num)
449 {
450         struct obd_device *obd = NULL;
451
452         if (num < class_devno_max()) {
453                 obd = obd_devs[num];
454                 if (obd == NULL)
455                         return NULL;
456
457                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458                          "%p obd_magic %08x != %08x\n",
459                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460                 LASSERTF(obd->obd_minor == num,
461                          "%p obd_minor %0d != %0d\n",
462                          obd, obd->obd_minor, num);
463         }
464
465         return obd;
466 }
467 EXPORT_SYMBOL(class_num2obd);
468
469 void class_obd_list(void)
470 {
471         char *status;
472         int i;
473
474         cfs_read_lock(&obd_dev_lock);
475         for (i = 0; i < class_devno_max(); i++) {
476                 struct obd_device *obd = class_num2obd(i);
477
478                 if (obd == NULL)
479                         continue;
480                 if (obd->obd_stopping)
481                         status = "ST";
482                 else if (obd->obd_set_up)
483                         status = "UP";
484                 else if (obd->obd_attached)
485                         status = "AT";
486                 else
487                         status = "--";
488                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489                          i, status, obd->obd_type->typ_name,
490                          obd->obd_name, obd->obd_uuid.uuid,
491                          cfs_atomic_read(&obd->obd_refcount));
492         }
493         cfs_read_unlock(&obd_dev_lock);
494         return;
495 }
496
497 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
498    specified, then only the client with that uuid is returned,
499    otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501                                           const char * typ_name,
502                                           struct obd_uuid *grp_uuid)
503 {
504         int i;
505
506         cfs_read_lock(&obd_dev_lock);
507         for (i = 0; i < class_devno_max(); i++) {
508                 struct obd_device *obd = class_num2obd(i);
509
510                 if (obd == NULL)
511                         continue;
512                 if ((strncmp(obd->obd_type->typ_name, typ_name,
513                              strlen(typ_name)) == 0)) {
514                         if (obd_uuid_equals(tgt_uuid,
515                                             &obd->u.cli.cl_target_uuid) &&
516                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
517                                                          &obd->obd_uuid) : 1)) {
518                                 cfs_read_unlock(&obd_dev_lock);
519                                 return obd;
520                         }
521                 }
522         }
523         cfs_read_unlock(&obd_dev_lock);
524
525         return NULL;
526 }
527 EXPORT_SYMBOL(class_find_client_obd);
528
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530    searching at *next, and if a device is found, the next index to look
531    at is saved in *next. If next is NULL, then the first matching device
532    will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
534 {
535         int i;
536
537         if (next == NULL)
538                 i = 0;
539         else if (*next >= 0 && *next < class_devno_max())
540                 i = *next;
541         else
542                 return NULL;
543
544         cfs_read_lock(&obd_dev_lock);
545         for (; i < class_devno_max(); i++) {
546                 struct obd_device *obd = class_num2obd(i);
547
548                 if (obd == NULL)
549                         continue;
550                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
551                         if (next != NULL)
552                                 *next = i+1;
553                         cfs_read_unlock(&obd_dev_lock);
554                         return obd;
555                 }
556         }
557         cfs_read_unlock(&obd_dev_lock);
558
559         return NULL;
560 }
561 EXPORT_SYMBOL(class_devices_in_group);
562
563 /**
564  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565  * adjust sptlrpc settings accordingly.
566  */
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
568 {
569         struct obd_device  *obd;
570         const char         *type;
571         int                 i, rc = 0, rc2;
572
573         LASSERT(namelen > 0);
574
575         cfs_read_lock(&obd_dev_lock);
576         for (i = 0; i < class_devno_max(); i++) {
577                 obd = class_num2obd(i);
578
579                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
580                         continue;
581
582                 /* only notify mdc, osc, mdt, ost */
583                 type = obd->obd_type->typ_name;
584                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587                     strcmp(type, LUSTRE_OST_NAME) != 0)
588                         continue;
589
590                 if (strncmp(obd->obd_name, fsname, namelen))
591                         continue;
592
593                 class_incref(obd, __FUNCTION__, obd);
594                 cfs_read_unlock(&obd_dev_lock);
595                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
596                                          sizeof(KEY_SPTLRPC_CONF),
597                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
598                 rc = rc ? rc : rc2;
599                 class_decref(obd, __FUNCTION__, obd);
600                 cfs_read_lock(&obd_dev_lock);
601         }
602         cfs_read_unlock(&obd_dev_lock);
603         return rc;
604 }
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
606
607 void obd_cleanup_caches(void)
608 {
609         int rc;
610
611         ENTRY;
612         if (obd_device_cachep) {
613                 rc = cfs_mem_cache_destroy(obd_device_cachep);
614                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615                 obd_device_cachep = NULL;
616         }
617         if (obdo_cachep) {
618                 rc = cfs_mem_cache_destroy(obdo_cachep);
619                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
620                 obdo_cachep = NULL;
621         }
622         if (import_cachep) {
623                 rc = cfs_mem_cache_destroy(import_cachep);
624                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625                 import_cachep = NULL;
626         }
627         if (capa_cachep) {
628                 rc = cfs_mem_cache_destroy(capa_cachep);
629                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
630                 capa_cachep = NULL;
631         }
632         EXIT;
633 }
634
635 int obd_init_caches(void)
636 {
637         ENTRY;
638
639         LASSERT(obd_device_cachep == NULL);
640         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641                                                  sizeof(struct obd_device),
642                                                  0, 0);
643         if (!obd_device_cachep)
644                 GOTO(out, -ENOMEM);
645
646         LASSERT(obdo_cachep == NULL);
647         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
648                                            0, 0);
649         if (!obdo_cachep)
650                 GOTO(out, -ENOMEM);
651
652         LASSERT(import_cachep == NULL);
653         import_cachep = cfs_mem_cache_create("ll_import_cache",
654                                              sizeof(struct obd_import),
655                                              0, 0);
656         if (!import_cachep)
657                 GOTO(out, -ENOMEM);
658
659         LASSERT(capa_cachep == NULL);
660         capa_cachep = cfs_mem_cache_create("capa_cache",
661                                            sizeof(struct obd_capa), 0, 0);
662         if (!capa_cachep)
663                 GOTO(out, -ENOMEM);
664
665         RETURN(0);
666  out:
667         obd_cleanup_caches();
668         RETURN(-ENOMEM);
669
670 }
671
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
674 {
675         struct obd_export *export;
676         ENTRY;
677
678         if (!conn) {
679                 CDEBUG(D_CACHE, "looking for null handle\n");
680                 RETURN(NULL);
681         }
682
683         if (conn->cookie == -1) {  /* this means assign a new connection */
684                 CDEBUG(D_CACHE, "want a new connection\n");
685                 RETURN(NULL);
686         }
687
688         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689         export = class_handle2object(conn->cookie);
690         RETURN(export);
691 }
692 EXPORT_SYMBOL(class_conn2export);
693
694 struct obd_device *class_exp2obd(struct obd_export *exp)
695 {
696         if (exp)
697                 return exp->exp_obd;
698         return NULL;
699 }
700 EXPORT_SYMBOL(class_exp2obd);
701
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
703 {
704         struct obd_export *export;
705         export = class_conn2export(conn);
706         if (export) {
707                 struct obd_device *obd = export->exp_obd;
708                 class_export_put(export);
709                 return obd;
710         }
711         return NULL;
712 }
713 EXPORT_SYMBOL(class_conn2obd);
714
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
716 {
717         struct obd_device *obd = exp->exp_obd;
718         if (obd == NULL)
719                 return NULL;
720         return obd->u.cli.cl_import;
721 }
722 EXPORT_SYMBOL(class_exp2cliimp);
723
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
725 {
726         struct obd_device *obd = class_conn2obd(conn);
727         if (obd == NULL)
728                 return NULL;
729         return obd->u.cli.cl_import;
730 }
731 EXPORT_SYMBOL(class_conn2cliimp);
732
733 /* Export management functions */
734 static void class_export_destroy(struct obd_export *exp)
735 {
736         struct obd_device *obd = exp->exp_obd;
737         ENTRY;
738
739         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
740         LASSERT(obd != NULL);
741
742         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
743                exp->exp_client_uuid.uuid, obd->obd_name);
744
745         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
746         if (exp->exp_connection)
747                 ptlrpc_put_connection_superhack(exp->exp_connection);
748
749         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
750         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
751         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
752         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
753         obd_destroy_export(exp);
754         class_decref(obd, "export", exp);
755
756         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
757         EXIT;
758 }
759
760 static void export_handle_addref(void *export)
761 {
762         class_export_get(export);
763 }
764
765 static struct portals_handle_ops export_handle_ops = {
766         .hop_addref = export_handle_addref,
767         .hop_free   = NULL,
768 };
769
770 struct obd_export *class_export_get(struct obd_export *exp)
771 {
772         cfs_atomic_inc(&exp->exp_refcount);
773         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
774                cfs_atomic_read(&exp->exp_refcount));
775         return exp;
776 }
777 EXPORT_SYMBOL(class_export_get);
778
779 void class_export_put(struct obd_export *exp)
780 {
781         LASSERT(exp != NULL);
782         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
783         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
784                cfs_atomic_read(&exp->exp_refcount) - 1);
785
786         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
787                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
788                 CDEBUG(D_IOCTL, "final put %p/%s\n",
789                        exp, exp->exp_client_uuid.uuid);
790
791                 /* release nid stat refererence */
792                 lprocfs_exp_cleanup(exp);
793
794                 obd_zombie_export_add(exp);
795         }
796 }
797 EXPORT_SYMBOL(class_export_put);
798
799 /* Creates a new export, adds it to the hash table, and returns a
800  * pointer to it. The refcount is 2: one for the hash reference, and
801  * one for the pointer returned by this function. */
802 struct obd_export *class_new_export(struct obd_device *obd,
803                                     struct obd_uuid *cluuid)
804 {
805         struct obd_export *export;
806         cfs_hash_t *hash = NULL;
807         int rc = 0;
808         ENTRY;
809
810         OBD_ALLOC_PTR(export);
811         if (!export)
812                 return ERR_PTR(-ENOMEM);
813
814         export->exp_conn_cnt = 0;
815         export->exp_lock_hash = NULL;
816         export->exp_flock_hash = NULL;
817         cfs_atomic_set(&export->exp_refcount, 2);
818         cfs_atomic_set(&export->exp_rpc_count, 0);
819         cfs_atomic_set(&export->exp_cb_count, 0);
820         cfs_atomic_set(&export->exp_locks_count, 0);
821 #if LUSTRE_TRACKS_LOCK_EXP_REFS
822         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
823         cfs_spin_lock_init(&export->exp_locks_list_guard);
824 #endif
825         cfs_atomic_set(&export->exp_replay_count, 0);
826         export->exp_obd = obd;
827         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
828         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
829         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
830         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
831         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
832         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
833         class_handle_hash(&export->exp_handle, &export_handle_ops);
834         export->exp_last_request_time = cfs_time_current_sec();
835         cfs_spin_lock_init(&export->exp_lock);
836         cfs_spin_lock_init(&export->exp_rpc_lock);
837         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
838         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
839         cfs_spin_lock_init(&export->exp_bl_list_lock);
840         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
841
842         export->exp_sp_peer = LUSTRE_SP_ANY;
843         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
844         export->exp_client_uuid = *cluuid;
845         obd_init_export(export);
846
847         cfs_spin_lock(&obd->obd_dev_lock);
848          /* shouldn't happen, but might race */
849         if (obd->obd_stopping)
850                 GOTO(exit_unlock, rc = -ENODEV);
851
852         hash = cfs_hash_getref(obd->obd_uuid_hash);
853         if (hash == NULL)
854                 GOTO(exit_unlock, rc = -ENODEV);
855         cfs_spin_unlock(&obd->obd_dev_lock);
856
857         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
858                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
859                 if (rc != 0) {
860                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
861                                       obd->obd_name, cluuid->uuid, rc);
862                         GOTO(exit_err, rc = -EALREADY);
863                 }
864         }
865
866         cfs_spin_lock(&obd->obd_dev_lock);
867         if (obd->obd_stopping) {
868                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
869                 GOTO(exit_unlock, rc = -ENODEV);
870         }
871
872         class_incref(obd, "export", export);
873         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
874         cfs_list_add_tail(&export->exp_obd_chain_timed,
875                           &export->exp_obd->obd_exports_timed);
876         export->exp_obd->obd_num_exports++;
877         cfs_spin_unlock(&obd->obd_dev_lock);
878         cfs_hash_putref(hash);
879         RETURN(export);
880
881 exit_unlock:
882         cfs_spin_unlock(&obd->obd_dev_lock);
883 exit_err:
884         if (hash)
885                 cfs_hash_putref(hash);
886         class_handle_unhash(&export->exp_handle);
887         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
888         obd_destroy_export(export);
889         OBD_FREE_PTR(export);
890         return ERR_PTR(rc);
891 }
892 EXPORT_SYMBOL(class_new_export);
893
894 void class_unlink_export(struct obd_export *exp)
895 {
896         class_handle_unhash(&exp->exp_handle);
897
898         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
899         /* delete an uuid-export hashitem from hashtables */
900         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
901                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
902                              &exp->exp_client_uuid,
903                              &exp->exp_uuid_hash);
904
905         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
906         cfs_list_del_init(&exp->exp_obd_chain_timed);
907         exp->exp_obd->obd_num_exports--;
908         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
909         class_export_put(exp);
910 }
911 EXPORT_SYMBOL(class_unlink_export);
912
913 /* Import management functions */
914 void class_import_destroy(struct obd_import *imp)
915 {
916         ENTRY;
917
918         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
919                 imp->imp_obd->obd_name);
920
921         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
922
923         ptlrpc_put_connection_superhack(imp->imp_connection);
924
925         while (!cfs_list_empty(&imp->imp_conn_list)) {
926                 struct obd_import_conn *imp_conn;
927
928                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
929                                           struct obd_import_conn, oic_item);
930                 cfs_list_del_init(&imp_conn->oic_item);
931                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
932                 OBD_FREE(imp_conn, sizeof(*imp_conn));
933         }
934
935         LASSERT(imp->imp_sec == NULL);
936         class_decref(imp->imp_obd, "import", imp);
937         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
938         EXIT;
939 }
940
941 static void import_handle_addref(void *import)
942 {
943         class_import_get(import);
944 }
945
946 static struct portals_handle_ops import_handle_ops = {
947         .hop_addref = import_handle_addref,
948         .hop_free   = NULL,
949 };
950
951 struct obd_import *class_import_get(struct obd_import *import)
952 {
953         cfs_atomic_inc(&import->imp_refcount);
954         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
955                cfs_atomic_read(&import->imp_refcount),
956                import->imp_obd->obd_name);
957         return import;
958 }
959 EXPORT_SYMBOL(class_import_get);
960
961 void class_import_put(struct obd_import *imp)
962 {
963         ENTRY;
964
965         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
966         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
967
968         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
969                cfs_atomic_read(&imp->imp_refcount) - 1,
970                imp->imp_obd->obd_name);
971
972         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
973                 CDEBUG(D_INFO, "final put import %p\n", imp);
974                 obd_zombie_import_add(imp);
975         }
976
977         /* catch possible import put race */
978         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
979         EXIT;
980 }
981 EXPORT_SYMBOL(class_import_put);
982
983 static void init_imp_at(struct imp_at *at) {
984         int i;
985         at_init(&at->iat_net_latency, 0, 0);
986         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
987                 /* max service estimates are tracked on the server side, so
988                    don't use the AT history here, just use the last reported
989                    val. (But keep hist for proc histogram, worst_ever) */
990                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
991                         AT_FLG_NOHIST);
992         }
993 }
994
995 struct obd_import *class_new_import(struct obd_device *obd)
996 {
997         struct obd_import *imp;
998
999         OBD_ALLOC(imp, sizeof(*imp));
1000         if (imp == NULL)
1001                 return NULL;
1002
1003         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1004         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1005         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1006         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1007         cfs_spin_lock_init(&imp->imp_lock);
1008         imp->imp_last_success_conn = 0;
1009         imp->imp_state = LUSTRE_IMP_NEW;
1010         imp->imp_obd = class_incref(obd, "import", imp);
1011         cfs_mutex_init(&imp->imp_sec_mutex);
1012         cfs_waitq_init(&imp->imp_recovery_waitq);
1013
1014         cfs_atomic_set(&imp->imp_refcount, 2);
1015         cfs_atomic_set(&imp->imp_unregistering, 0);
1016         cfs_atomic_set(&imp->imp_inflight, 0);
1017         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1018         cfs_atomic_set(&imp->imp_inval_count, 0);
1019         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1020         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1021         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1022         init_imp_at(&imp->imp_at);
1023
1024         /* the default magic is V2, will be used in connect RPC, and
1025          * then adjusted according to the flags in request/reply. */
1026         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1027
1028         return imp;
1029 }
1030 EXPORT_SYMBOL(class_new_import);
1031
1032 void class_destroy_import(struct obd_import *import)
1033 {
1034         LASSERT(import != NULL);
1035         LASSERT(import != LP_POISON);
1036
1037         class_handle_unhash(&import->imp_handle);
1038
1039         cfs_spin_lock(&import->imp_lock);
1040         import->imp_generation++;
1041         cfs_spin_unlock(&import->imp_lock);
1042         class_import_put(import);
1043 }
1044 EXPORT_SYMBOL(class_destroy_import);
1045
1046 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1047
1048 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1049 {
1050         cfs_spin_lock(&exp->exp_locks_list_guard);
1051
1052         LASSERT(lock->l_exp_refs_nr >= 0);
1053
1054         if (lock->l_exp_refs_target != NULL &&
1055             lock->l_exp_refs_target != exp) {
1056                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1057                               exp, lock, lock->l_exp_refs_target);
1058         }
1059         if ((lock->l_exp_refs_nr ++) == 0) {
1060                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1061                 lock->l_exp_refs_target = exp;
1062         }
1063         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1064                lock, exp, lock->l_exp_refs_nr);
1065         cfs_spin_unlock(&exp->exp_locks_list_guard);
1066 }
1067 EXPORT_SYMBOL(__class_export_add_lock_ref);
1068
1069 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1070 {
1071         cfs_spin_lock(&exp->exp_locks_list_guard);
1072         LASSERT(lock->l_exp_refs_nr > 0);
1073         if (lock->l_exp_refs_target != exp) {
1074                 LCONSOLE_WARN("lock %p, "
1075                               "mismatching export pointers: %p, %p\n",
1076                               lock, lock->l_exp_refs_target, exp);
1077         }
1078         if (-- lock->l_exp_refs_nr == 0) {
1079                 cfs_list_del_init(&lock->l_exp_refs_link);
1080                 lock->l_exp_refs_target = NULL;
1081         }
1082         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1083                lock, exp, lock->l_exp_refs_nr);
1084         cfs_spin_unlock(&exp->exp_locks_list_guard);
1085 }
1086 EXPORT_SYMBOL(__class_export_del_lock_ref);
1087 #endif
1088
1089 /* A connection defines an export context in which preallocation can
1090    be managed. This releases the export pointer reference, and returns
1091    the export handle, so the export refcount is 1 when this function
1092    returns. */
1093 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1094                   struct obd_uuid *cluuid)
1095 {
1096         struct obd_export *export;
1097         LASSERT(conn != NULL);
1098         LASSERT(obd != NULL);
1099         LASSERT(cluuid != NULL);
1100         ENTRY;
1101
1102         export = class_new_export(obd, cluuid);
1103         if (IS_ERR(export))
1104                 RETURN(PTR_ERR(export));
1105
1106         conn->cookie = export->exp_handle.h_cookie;
1107         class_export_put(export);
1108
1109         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1110                cluuid->uuid, conn->cookie);
1111         RETURN(0);
1112 }
1113 EXPORT_SYMBOL(class_connect);
1114
1115 /* if export is involved in recovery then clean up related things */
1116 void class_export_recovery_cleanup(struct obd_export *exp)
1117 {
1118         struct obd_device *obd = exp->exp_obd;
1119
1120         cfs_spin_lock(&obd->obd_recovery_task_lock);
1121         if (exp->exp_delayed)
1122                 obd->obd_delayed_clients--;
1123         if (obd->obd_recovering && exp->exp_in_recovery) {
1124                 cfs_spin_lock(&exp->exp_lock);
1125                 exp->exp_in_recovery = 0;
1126                 cfs_spin_unlock(&exp->exp_lock);
1127                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1128                 cfs_atomic_dec(&obd->obd_connected_clients);
1129         }
1130         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1131         /** Cleanup req replay fields */
1132         if (exp->exp_req_replay_needed) {
1133                 cfs_spin_lock(&exp->exp_lock);
1134                 exp->exp_req_replay_needed = 0;
1135                 cfs_spin_unlock(&exp->exp_lock);
1136                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1137                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1138         }
1139         /** Cleanup lock replay data */
1140         if (exp->exp_lock_replay_needed) {
1141                 cfs_spin_lock(&exp->exp_lock);
1142                 exp->exp_lock_replay_needed = 0;
1143                 cfs_spin_unlock(&exp->exp_lock);
1144                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1145                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1146         }
1147 }
1148
1149 /* This function removes 1-3 references from the export:
1150  * 1 - for export pointer passed
1151  * and if disconnect really need
1152  * 2 - removing from hash
1153  * 3 - in client_unlink_export
1154  * The export pointer passed to this function can destroyed */
1155 int class_disconnect(struct obd_export *export)
1156 {
1157         int already_disconnected;
1158         ENTRY;
1159
1160         if (export == NULL) {
1161                 CWARN("attempting to free NULL export %p\n", export);
1162                 RETURN(-EINVAL);
1163         }
1164
1165         cfs_spin_lock(&export->exp_lock);
1166         already_disconnected = export->exp_disconnected;
1167         export->exp_disconnected = 1;
1168         cfs_spin_unlock(&export->exp_lock);
1169
1170         /* class_cleanup(), abort_recovery(), and class_fail_export()
1171          * all end up in here, and if any of them race we shouldn't
1172          * call extra class_export_puts(). */
1173         if (already_disconnected) {
1174                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1175                 GOTO(no_disconn, already_disconnected);
1176         }
1177
1178         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1179                export->exp_handle.h_cookie);
1180
1181         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1182                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1183                              &export->exp_connection->c_peer.nid,
1184                              &export->exp_nid_hash);
1185
1186         class_export_recovery_cleanup(export);
1187         class_unlink_export(export);
1188 no_disconn:
1189         class_export_put(export);
1190         RETURN(0);
1191 }
1192 EXPORT_SYMBOL(class_disconnect);
1193
1194 /* Return non-zero for a fully connected export */
1195 int class_connected_export(struct obd_export *exp)
1196 {
1197         if (exp) {
1198                 int connected;
1199                 cfs_spin_lock(&exp->exp_lock);
1200                 connected = (exp->exp_conn_cnt > 0);
1201                 cfs_spin_unlock(&exp->exp_lock);
1202                 return connected;
1203         }
1204         return 0;
1205 }
1206 EXPORT_SYMBOL(class_connected_export);
1207
1208 static void class_disconnect_export_list(cfs_list_t *list,
1209                                          enum obd_option flags)
1210 {
1211         int rc;
1212         struct obd_export *exp;
1213         ENTRY;
1214
1215         /* It's possible that an export may disconnect itself, but
1216          * nothing else will be added to this list. */
1217         while (!cfs_list_empty(list)) {
1218                 exp = cfs_list_entry(list->next, struct obd_export,
1219                                      exp_obd_chain);
1220                 /* need for safe call CDEBUG after obd_disconnect */
1221                 class_export_get(exp);
1222
1223                 cfs_spin_lock(&exp->exp_lock);
1224                 exp->exp_flags = flags;
1225                 cfs_spin_unlock(&exp->exp_lock);
1226
1227                 if (obd_uuid_equals(&exp->exp_client_uuid,
1228                                     &exp->exp_obd->obd_uuid)) {
1229                         CDEBUG(D_HA,
1230                                "exp %p export uuid == obd uuid, don't discon\n",
1231                                exp);
1232                         /* Need to delete this now so we don't end up pointing
1233                          * to work_list later when this export is cleaned up. */
1234                         cfs_list_del_init(&exp->exp_obd_chain);
1235                         class_export_put(exp);
1236                         continue;
1237                 }
1238
1239                 class_export_get(exp);
1240                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1241                        "last request at "CFS_TIME_T"\n",
1242                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1243                        exp, exp->exp_last_request_time);
1244                 /* release one export reference anyway */
1245                 rc = obd_disconnect(exp);
1246
1247                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1248                        obd_export_nid2str(exp), exp, rc);
1249                 class_export_put(exp);
1250         }
1251         EXIT;
1252 }
1253
1254 void class_disconnect_exports(struct obd_device *obd)
1255 {
1256         cfs_list_t work_list;
1257         ENTRY;
1258
1259         /* Move all of the exports from obd_exports to a work list, en masse. */
1260         CFS_INIT_LIST_HEAD(&work_list);
1261         cfs_spin_lock(&obd->obd_dev_lock);
1262         cfs_list_splice_init(&obd->obd_exports, &work_list);
1263         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1264         cfs_spin_unlock(&obd->obd_dev_lock);
1265
1266         if (!cfs_list_empty(&work_list)) {
1267                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1268                        "disconnecting them\n", obd->obd_minor, obd);
1269                 class_disconnect_export_list(&work_list,
1270                                              exp_flags_from_obd(obd));
1271         } else
1272                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1273                        obd->obd_minor, obd);
1274         EXIT;
1275 }
1276 EXPORT_SYMBOL(class_disconnect_exports);
1277
1278 /* Remove exports that have not completed recovery.
1279  */
1280 void class_disconnect_stale_exports(struct obd_device *obd,
1281                                     int (*test_export)(struct obd_export *))
1282 {
1283         cfs_list_t work_list;
1284         struct obd_export *exp, *n;
1285         int evicted = 0;
1286         ENTRY;
1287
1288         CFS_INIT_LIST_HEAD(&work_list);
1289         cfs_spin_lock(&obd->obd_dev_lock);
1290         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1291                                      exp_obd_chain) {
1292                 /* don't count self-export as client */
1293                 if (obd_uuid_equals(&exp->exp_client_uuid,
1294                                     &exp->exp_obd->obd_uuid))
1295                         continue;
1296
1297                 cfs_spin_lock(&exp->exp_lock);
1298                 if (test_export(exp)) {
1299                         cfs_spin_unlock(&exp->exp_lock);
1300                         continue;
1301                 }
1302                 exp->exp_failed = 1;
1303                 cfs_spin_unlock(&exp->exp_lock);
1304
1305                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1306                 evicted++;
1307                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1308                        obd->obd_name, exp->exp_client_uuid.uuid,
1309                        exp->exp_connection == NULL ? "<unknown>" :
1310                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1311                 print_export_data(exp, "EVICTING", 0);
1312         }
1313         cfs_spin_unlock(&obd->obd_dev_lock);
1314
1315         if (evicted) {
1316                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1317                               obd->obd_name, evicted);
1318                 obd->obd_stale_clients += evicted;
1319         }
1320         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1321                                                  OBD_OPT_ABORT_RECOV);
1322         EXIT;
1323 }
1324 EXPORT_SYMBOL(class_disconnect_stale_exports);
1325
1326 void class_fail_export(struct obd_export *exp)
1327 {
1328         int rc, already_failed;
1329
1330         cfs_spin_lock(&exp->exp_lock);
1331         already_failed = exp->exp_failed;
1332         exp->exp_failed = 1;
1333         cfs_spin_unlock(&exp->exp_lock);
1334
1335         if (already_failed) {
1336                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1337                        exp, exp->exp_client_uuid.uuid);
1338                 return;
1339         }
1340
1341         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1342                exp, exp->exp_client_uuid.uuid);
1343
1344         if (obd_dump_on_timeout)
1345                 libcfs_debug_dumplog();
1346
1347         /* need for safe call CDEBUG after obd_disconnect */
1348         class_export_get(exp);
1349
1350         /* Most callers into obd_disconnect are removing their own reference
1351          * (request, for example) in addition to the one from the hash table.
1352          * We don't have such a reference here, so make one. */
1353         class_export_get(exp);
1354         rc = obd_disconnect(exp);
1355         if (rc)
1356                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1357         else
1358                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1359                        exp, exp->exp_client_uuid.uuid);
1360         class_export_put(exp);
1361 }
1362 EXPORT_SYMBOL(class_fail_export);
1363
1364 char *obd_export_nid2str(struct obd_export *exp)
1365 {
1366         if (exp->exp_connection != NULL)
1367                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1368
1369         return "(no nid)";
1370 }
1371 EXPORT_SYMBOL(obd_export_nid2str);
1372
1373 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1374 {
1375         struct obd_export *doomed_exp = NULL;
1376         int exports_evicted = 0;
1377
1378         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1379
1380         do {
1381                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1382                 if (doomed_exp == NULL)
1383                         break;
1384
1385                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1386                          "nid %s found, wanted nid %s, requested nid %s\n",
1387                          obd_export_nid2str(doomed_exp),
1388                          libcfs_nid2str(nid_key), nid);
1389                 LASSERTF(doomed_exp != obd->obd_self_export,
1390                          "self-export is hashed by NID?\n");
1391                 exports_evicted++;
1392                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1393                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1394                        exports_evicted);
1395                 class_fail_export(doomed_exp);
1396                 class_export_put(doomed_exp);
1397         } while (1);
1398
1399         if (!exports_evicted)
1400                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1401                        obd->obd_name, nid);
1402         return exports_evicted;
1403 }
1404 EXPORT_SYMBOL(obd_export_evict_by_nid);
1405
1406 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1407 {
1408         struct obd_export *doomed_exp = NULL;
1409         struct obd_uuid doomed_uuid;
1410         int exports_evicted = 0;
1411
1412         obd_str2uuid(&doomed_uuid, uuid);
1413         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1414                 CERROR("%s: can't evict myself\n", obd->obd_name);
1415                 return exports_evicted;
1416         }
1417
1418         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1419
1420         if (doomed_exp == NULL) {
1421                 CERROR("%s: can't disconnect %s: no exports found\n",
1422                        obd->obd_name, uuid);
1423         } else {
1424                 CWARN("%s: evicting %s at adminstrative request\n",
1425                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1426                 class_fail_export(doomed_exp);
1427                 class_export_put(doomed_exp);
1428                 exports_evicted++;
1429         }
1430
1431         return exports_evicted;
1432 }
1433 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1434
1435 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1436 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1437 EXPORT_SYMBOL(class_export_dump_hook);
1438 #endif
1439
1440 static void print_export_data(struct obd_export *exp, const char *status,
1441                               int locks)
1442 {
1443         struct ptlrpc_reply_state *rs;
1444         struct ptlrpc_reply_state *first_reply = NULL;
1445         int nreplies = 0;
1446
1447         cfs_spin_lock(&exp->exp_lock);
1448         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1449                                 rs_exp_list) {
1450                 if (nreplies == 0)
1451                         first_reply = rs;
1452                 nreplies++;
1453         }
1454         cfs_spin_unlock(&exp->exp_lock);
1455
1456         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1457                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1458                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1459                cfs_atomic_read(&exp->exp_rpc_count),
1460                cfs_atomic_read(&exp->exp_cb_count),
1461                cfs_atomic_read(&exp->exp_locks_count),
1462                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1463                nreplies, first_reply, nreplies > 3 ? "..." : "",
1464                exp->exp_last_committed);
1465 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1466         if (locks && class_export_dump_hook != NULL)
1467                 class_export_dump_hook(exp);
1468 #endif
1469 }
1470
1471 void dump_exports(struct obd_device *obd, int locks)
1472 {
1473         struct obd_export *exp;
1474
1475         cfs_spin_lock(&obd->obd_dev_lock);
1476         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1477                 print_export_data(exp, "ACTIVE", locks);
1478         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1479                 print_export_data(exp, "UNLINKED", locks);
1480         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1481                 print_export_data(exp, "DELAYED", locks);
1482         cfs_spin_unlock(&obd->obd_dev_lock);
1483         cfs_spin_lock(&obd_zombie_impexp_lock);
1484         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1485                 print_export_data(exp, "ZOMBIE", locks);
1486         cfs_spin_unlock(&obd_zombie_impexp_lock);
1487 }
1488 EXPORT_SYMBOL(dump_exports);
1489
1490 void obd_exports_barrier(struct obd_device *obd)
1491 {
1492         int waited = 2;
1493         LASSERT(cfs_list_empty(&obd->obd_exports));
1494         cfs_spin_lock(&obd->obd_dev_lock);
1495         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1496                 cfs_spin_unlock(&obd->obd_dev_lock);
1497                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1498                                                    cfs_time_seconds(waited));
1499                 if (waited > 5 && IS_PO2(waited)) {
1500                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1501                                       "more than %d seconds. "
1502                                       "The obd refcount = %d. Is it stuck?\n",
1503                                       obd->obd_name, waited,
1504                                       cfs_atomic_read(&obd->obd_refcount));
1505                         dump_exports(obd, 1);
1506                 }
1507                 waited *= 2;
1508                 cfs_spin_lock(&obd->obd_dev_lock);
1509         }
1510         cfs_spin_unlock(&obd->obd_dev_lock);
1511 }
1512 EXPORT_SYMBOL(obd_exports_barrier);
1513
1514 /* Total amount of zombies to be destroyed */
1515 static int zombies_count = 0;
1516
1517 /**
1518  * kill zombie imports and exports
1519  */
1520 void obd_zombie_impexp_cull(void)
1521 {
1522         struct obd_import *import;
1523         struct obd_export *export;
1524         ENTRY;
1525
1526         do {
1527                 cfs_spin_lock(&obd_zombie_impexp_lock);
1528
1529                 import = NULL;
1530                 if (!cfs_list_empty(&obd_zombie_imports)) {
1531                         import = cfs_list_entry(obd_zombie_imports.next,
1532                                                 struct obd_import,
1533                                                 imp_zombie_chain);
1534                         cfs_list_del_init(&import->imp_zombie_chain);
1535                 }
1536
1537                 export = NULL;
1538                 if (!cfs_list_empty(&obd_zombie_exports)) {
1539                         export = cfs_list_entry(obd_zombie_exports.next,
1540                                                 struct obd_export,
1541                                                 exp_obd_chain);
1542                         cfs_list_del_init(&export->exp_obd_chain);
1543                 }
1544
1545                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1546
1547                 if (import != NULL) {
1548                         class_import_destroy(import);
1549                         cfs_spin_lock(&obd_zombie_impexp_lock);
1550                         zombies_count--;
1551                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1552                 }
1553
1554                 if (export != NULL) {
1555                         class_export_destroy(export);
1556                         cfs_spin_lock(&obd_zombie_impexp_lock);
1557                         zombies_count--;
1558                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1559                 }
1560
1561                 cfs_cond_resched();
1562         } while (import != NULL || export != NULL);
1563         EXIT;
1564 }
1565
1566 static cfs_completion_t         obd_zombie_start;
1567 static cfs_completion_t         obd_zombie_stop;
1568 static unsigned long            obd_zombie_flags;
1569 static cfs_waitq_t              obd_zombie_waitq;
1570 static pid_t                    obd_zombie_pid;
1571
1572 enum {
1573         OBD_ZOMBIE_STOP   = 1 << 1
1574 };
1575
1576 /**
1577  * check for work for kill zombie import/export thread.
1578  */
1579 static int obd_zombie_impexp_check(void *arg)
1580 {
1581         int rc;
1582
1583         cfs_spin_lock(&obd_zombie_impexp_lock);
1584         rc = (zombies_count == 0) &&
1585              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1586         cfs_spin_unlock(&obd_zombie_impexp_lock);
1587
1588         RETURN(rc);
1589 }
1590
1591 /**
1592  * Add export to the obd_zombe thread and notify it.
1593  */
1594 static void obd_zombie_export_add(struct obd_export *exp) {
1595         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1596         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1597         cfs_list_del_init(&exp->exp_obd_chain);
1598         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1599         cfs_spin_lock(&obd_zombie_impexp_lock);
1600         zombies_count++;
1601         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1602         cfs_spin_unlock(&obd_zombie_impexp_lock);
1603
1604         obd_zombie_impexp_notify();
1605 }
1606
1607 /**
1608  * Add import to the obd_zombe thread and notify it.
1609  */
1610 static void obd_zombie_import_add(struct obd_import *imp) {
1611         LASSERT(imp->imp_sec == NULL);
1612         LASSERT(imp->imp_rq_pool == NULL);
1613         cfs_spin_lock(&obd_zombie_impexp_lock);
1614         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1615         zombies_count++;
1616         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1617         cfs_spin_unlock(&obd_zombie_impexp_lock);
1618
1619         obd_zombie_impexp_notify();
1620 }
1621
1622 /**
1623  * notify import/export destroy thread about new zombie.
1624  */
1625 static void obd_zombie_impexp_notify(void)
1626 {
1627         /*
1628          * Make sure obd_zomebie_impexp_thread get this notification.
1629          * It is possible this signal only get by obd_zombie_barrier, and
1630          * barrier gulps this notification and sleeps away and hangs ensues
1631          */
1632         cfs_waitq_broadcast(&obd_zombie_waitq);
1633 }
1634
1635 /**
1636  * check whether obd_zombie is idle
1637  */
1638 static int obd_zombie_is_idle(void)
1639 {
1640         int rc;
1641
1642         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1643         cfs_spin_lock(&obd_zombie_impexp_lock);
1644         rc = (zombies_count == 0);
1645         cfs_spin_unlock(&obd_zombie_impexp_lock);
1646         return rc;
1647 }
1648
1649 /**
1650  * wait when obd_zombie import/export queues become empty
1651  */
1652 void obd_zombie_barrier(void)
1653 {
1654         struct l_wait_info lwi = { 0 };
1655
1656         if (obd_zombie_pid == cfs_curproc_pid())
1657                 /* don't wait for myself */
1658                 return;
1659         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1660 }
1661 EXPORT_SYMBOL(obd_zombie_barrier);
1662
1663 #ifdef __KERNEL__
1664
1665 /**
1666  * destroy zombie export/import thread.
1667  */
1668 static int obd_zombie_impexp_thread(void *unused)
1669 {
1670         int rc;
1671
1672         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1673                 cfs_complete(&obd_zombie_start);
1674                 RETURN(rc);
1675         }
1676
1677         cfs_complete(&obd_zombie_start);
1678
1679         obd_zombie_pid = cfs_curproc_pid();
1680
1681         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1682                 struct l_wait_info lwi = { 0 };
1683
1684                 l_wait_event(obd_zombie_waitq,
1685                              !obd_zombie_impexp_check(NULL), &lwi);
1686                 obd_zombie_impexp_cull();
1687
1688                 /*
1689                  * Notify obd_zombie_barrier callers that queues
1690                  * may be empty.
1691                  */
1692                 cfs_waitq_signal(&obd_zombie_waitq);
1693         }
1694
1695         cfs_complete(&obd_zombie_stop);
1696
1697         RETURN(0);
1698 }
1699
1700 #else /* ! KERNEL */
1701
1702 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1703 static void *obd_zombie_impexp_work_cb;
1704 static void *obd_zombie_impexp_idle_cb;
1705
1706 int obd_zombie_impexp_kill(void *arg)
1707 {
1708         int rc = 0;
1709
1710         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1711                 obd_zombie_impexp_cull();
1712                 rc = 1;
1713         }
1714         cfs_atomic_dec(&zombie_recur);
1715         return rc;
1716 }
1717
1718 #endif
1719
1720 /**
1721  * start destroy zombie import/export thread
1722  */
1723 int obd_zombie_impexp_init(void)
1724 {
1725         int rc;
1726
1727         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1728         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1729         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1730         cfs_init_completion(&obd_zombie_start);
1731         cfs_init_completion(&obd_zombie_stop);
1732         cfs_waitq_init(&obd_zombie_waitq);
1733         obd_zombie_pid = 0;
1734
1735 #ifdef __KERNEL__
1736         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1737         if (rc < 0)
1738                 RETURN(rc);
1739
1740         cfs_wait_for_completion(&obd_zombie_start);
1741 #else
1742
1743         obd_zombie_impexp_work_cb =
1744                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1745                                                  &obd_zombie_impexp_kill, NULL);
1746
1747         obd_zombie_impexp_idle_cb =
1748                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1749                                                  &obd_zombie_impexp_check, NULL);
1750         rc = 0;
1751 #endif
1752         RETURN(rc);
1753 }
1754 /**
1755  * stop destroy zombie import/export thread
1756  */
1757 void obd_zombie_impexp_stop(void)
1758 {
1759         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1760         obd_zombie_impexp_notify();
1761 #ifdef __KERNEL__
1762         cfs_wait_for_completion(&obd_zombie_stop);
1763 #else
1764         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1765         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1766 #endif
1767 }
1768
1769 /***** Kernel-userspace comm helpers *******/
1770
1771 /* Get length of entire message, including header */
1772 int kuc_len(int payload_len)
1773 {
1774         return sizeof(struct kuc_hdr) + payload_len;
1775 }
1776 EXPORT_SYMBOL(kuc_len);
1777
1778 /* Get a pointer to kuc header, given a ptr to the payload
1779  * @param p Pointer to payload area
1780  * @returns Pointer to kuc header
1781  */
1782 struct kuc_hdr * kuc_ptr(void *p)
1783 {
1784         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1785         LASSERT(lh->kuc_magic == KUC_MAGIC);
1786         return lh;
1787 }
1788 EXPORT_SYMBOL(kuc_ptr);
1789
1790 /* Test if payload is part of kuc message
1791  * @param p Pointer to payload area
1792  * @returns boolean
1793  */
1794 int kuc_ispayload(void *p)
1795 {
1796         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1797
1798         if (kh->kuc_magic == KUC_MAGIC)
1799                 return 1;
1800         else
1801                 return 0;
1802 }
1803 EXPORT_SYMBOL(kuc_ispayload);
1804
1805 /* Alloc space for a message, and fill in header
1806  * @return Pointer to payload area
1807  */
1808 void *kuc_alloc(int payload_len, int transport, int type)
1809 {
1810         struct kuc_hdr *lh;
1811         int len = kuc_len(payload_len);
1812
1813         OBD_ALLOC(lh, len);
1814         if (lh == NULL)
1815                 return ERR_PTR(-ENOMEM);
1816
1817         lh->kuc_magic = KUC_MAGIC;
1818         lh->kuc_transport = transport;
1819         lh->kuc_msgtype = type;
1820         lh->kuc_msglen = len;
1821
1822         return (void *)(lh + 1);
1823 }
1824 EXPORT_SYMBOL(kuc_alloc);
1825
1826 /* Takes pointer to payload area */
1827 inline void kuc_free(void *p, int payload_len)
1828 {
1829         struct kuc_hdr *lh = kuc_ptr(p);
1830         OBD_FREE(lh, kuc_len(payload_len));
1831 }
1832 EXPORT_SYMBOL(kuc_free);
1833
1834
1835