Whamcloud - gitweb
36d8b44e92f658b5d4c759110bcd3a77ebe3ceff
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/obdclass/genops.c
40  *
41  * These are the only exported functions, they provide some generic
42  * infrastructure for managing object devices
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef __KERNEL__
47 #include <liblustre.h>
48 #endif
49 #include <obd_ost.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
52
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
55
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
60
61 cfs_list_t      obd_zombie_imports;
62 cfs_list_t      obd_zombie_exports;
63 cfs_spinlock_t  obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68                               const char *status, int locks);
69
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71
72 /*
73  * support functions: we could use inter-module communication, but this
74  * is more portable to other OS's
75  */
76 static struct obd_device *obd_device_alloc(void)
77 {
78         struct obd_device *obd;
79
80         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
81         if (obd != NULL) {
82                 obd->obd_magic = OBD_DEVICE_MAGIC;
83         }
84         return obd;
85 }
86
87 static void obd_device_free(struct obd_device *obd)
88 {
89         LASSERT(obd != NULL);
90         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92         if (obd->obd_namespace != NULL) {
93                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94                        obd, obd->obd_namespace, obd->obd_force);
95                 LBUG();
96         }
97         lu_ref_fini(&obd->obd_reference);
98         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 }
100
101 struct obd_type *class_search_type(const char *name)
102 {
103         cfs_list_t *tmp;
104         struct obd_type *type;
105
106         cfs_spin_lock(&obd_types_lock);
107         cfs_list_for_each(tmp, &obd_types) {
108                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109                 if (strcmp(type->typ_name, name) == 0) {
110                         cfs_spin_unlock(&obd_types_lock);
111                         return type;
112                 }
113         }
114         cfs_spin_unlock(&obd_types_lock);
115         return NULL;
116 }
117
118 struct obd_type *class_get_type(const char *name)
119 {
120         struct obd_type *type = class_search_type(name);
121
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
123         if (!type) {
124                 const char *modname = name;
125                 if (!cfs_request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 cfs_spin_lock(&type->obd_type_lock);
136                 type->typ_refcnt++;
137                 cfs_try_module_get(type->typ_dt_ops->o_owner);
138                 cfs_spin_unlock(&type->obd_type_lock);
139         }
140         return type;
141 }
142 EXPORT_SYMBOL(class_get_type);
143
144 void class_put_type(struct obd_type *type)
145 {
146         LASSERT(type);
147         cfs_spin_lock(&type->obd_type_lock);
148         type->typ_refcnt--;
149         cfs_module_put(type->typ_dt_ops->o_owner);
150         cfs_spin_unlock(&type->obd_type_lock);
151 }
152 EXPORT_SYMBOL(class_put_type);
153
154 #define CLASS_MAX_NAME 1024
155
156 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
157                         struct lprocfs_vars *vars, const char *name,
158                         struct lu_device_type *ldt)
159 {
160         struct obd_type *type;
161         int rc = 0;
162         ENTRY;
163
164         /* sanity check */
165         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166
167         if (class_search_type(name)) {
168                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169                 RETURN(-EEXIST);
170         }
171
172         rc = -ENOMEM;
173         OBD_ALLOC(type, sizeof(*type));
174         if (type == NULL)
175                 RETURN(rc);
176
177         OBD_ALLOC_PTR(type->typ_dt_ops);
178         OBD_ALLOC_PTR(type->typ_md_ops);
179         OBD_ALLOC(type->typ_name, strlen(name) + 1);
180
181         if (type->typ_dt_ops == NULL ||
182             type->typ_md_ops == NULL ||
183             type->typ_name == NULL)
184                 GOTO (failed, rc);
185
186         *(type->typ_dt_ops) = *dt_ops;
187         /* md_ops is optional */
188         if (md_ops)
189                 *(type->typ_md_ops) = *md_ops;
190         strcpy(type->typ_name, name);
191         cfs_spin_lock_init(&type->obd_type_lock);
192
193 #ifdef LPROCFS
194         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195                                               vars, type);
196         if (IS_ERR(type->typ_procroot)) {
197                 rc = PTR_ERR(type->typ_procroot);
198                 type->typ_procroot = NULL;
199                 GOTO (failed, rc);
200         }
201 #endif
202         if (ldt != NULL) {
203                 type->typ_lu = ldt;
204                 rc = lu_device_type_init(ldt);
205                 if (rc != 0)
206                         GOTO (failed, rc);
207         }
208
209         cfs_spin_lock(&obd_types_lock);
210         cfs_list_add(&type->typ_chain, &obd_types);
211         cfs_spin_unlock(&obd_types_lock);
212
213         RETURN (0);
214
215  failed:
216         if (type->typ_name != NULL)
217                 OBD_FREE(type->typ_name, strlen(name) + 1);
218         if (type->typ_md_ops != NULL)
219                 OBD_FREE_PTR(type->typ_md_ops);
220         if (type->typ_dt_ops != NULL)
221                 OBD_FREE_PTR(type->typ_dt_ops);
222         OBD_FREE(type, sizeof(*type));
223         RETURN(rc);
224 }
225 EXPORT_SYMBOL(class_register_type);
226
227 int class_unregister_type(const char *name)
228 {
229         struct obd_type *type = class_search_type(name);
230         ENTRY;
231
232         if (!type) {
233                 CERROR("unknown obd type\n");
234                 RETURN(-EINVAL);
235         }
236
237         if (type->typ_refcnt) {
238                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239                 /* This is a bad situation, let's make the best of it */
240                 /* Remove ops, but leave the name for debugging */
241                 OBD_FREE_PTR(type->typ_dt_ops);
242                 OBD_FREE_PTR(type->typ_md_ops);
243                 RETURN(-EBUSY);
244         }
245
246         if (type->typ_procroot) {
247                 lprocfs_remove(&type->typ_procroot);
248         }
249
250         if (type->typ_lu)
251                 lu_device_type_fini(type->typ_lu);
252
253         cfs_spin_lock(&obd_types_lock);
254         cfs_list_del(&type->typ_chain);
255         cfs_spin_unlock(&obd_types_lock);
256         OBD_FREE(type->typ_name, strlen(name) + 1);
257         if (type->typ_dt_ops != NULL)
258                 OBD_FREE_PTR(type->typ_dt_ops);
259         if (type->typ_md_ops != NULL)
260                 OBD_FREE_PTR(type->typ_md_ops);
261         OBD_FREE(type, sizeof(*type));
262         RETURN(0);
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
265
266 /**
267  * Create a new obd device.
268  *
269  * Find an empty slot in ::obd_devs[], create a new obd device in it.
270  *
271  * \param[in] type_name obd device type string.
272  * \param[in] name      obd device name.
273  *
274  * \retval NULL if create fails, otherwise return the obd device
275  *         pointer created.
276  */
277 struct obd_device *class_newdev(const char *type_name, const char *name)
278 {
279         struct obd_device *result = NULL;
280         struct obd_device *newdev;
281         struct obd_type *type = NULL;
282         int i;
283         int new_obd_minor = 0;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 RETURN(ERR_PTR(-EINVAL));
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 RETURN(ERR_PTR(-ENODEV));
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 class_put_type(type);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         cfs_write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && obd->obd_name &&
308                     (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists, won't add\n", name);
310                         if (result) {
311                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312                                          "%p obd_magic %08x != %08x\n", result,
313                                          result->obd_magic, OBD_DEVICE_MAGIC);
314                                 LASSERTF(result->obd_minor == new_obd_minor,
315                                          "%p obd_minor %d != %d\n", result,
316                                          result->obd_minor, new_obd_minor);
317
318                                 obd_devs[result->obd_minor] = NULL;
319                                 result->obd_name[0]='\0';
320                          }
321                         result = ERR_PTR(-EEXIST);
322                         break;
323                 }
324                 if (!result && !obd) {
325                         result = newdev;
326                         result->obd_minor = i;
327                         new_obd_minor = i;
328                         result->obd_type = type;
329                         strncpy(result->obd_name, name,
330                                 sizeof(result->obd_name) - 1);
331                         obd_devs[i] = result;
332                 }
333         }
334         cfs_write_unlock(&obd_dev_lock);
335
336         if (result == NULL && i >= class_devno_max()) {
337                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338                        class_devno_max());
339                 result = ERR_PTR(-EOVERFLOW);
340         }
341
342         if (IS_ERR(result)) {
343                 obd_device_free(newdev);
344                 class_put_type(type);
345         } else {
346                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347                        result->obd_name, result);
348         }
349         return result;
350 }
351
352 void class_release_dev(struct obd_device *obd)
353 {
354         struct obd_type *obd_type = obd->obd_type;
355
356         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360         LASSERT(obd_type != NULL);
361
362         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
363                obd->obd_name,obd->obd_type->typ_name);
364
365         cfs_write_lock(&obd_dev_lock);
366         obd_devs[obd->obd_minor] = NULL;
367         cfs_write_unlock(&obd_dev_lock);
368         obd_device_free(obd);
369
370         class_put_type(obd_type);
371 }
372
373 int class_name2dev(const char *name)
374 {
375         int i;
376
377         if (!name)
378                 return -1;
379
380         cfs_read_lock(&obd_dev_lock);
381         for (i = 0; i < class_devno_max(); i++) {
382                 struct obd_device *obd = class_num2obd(i);
383
384                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385                         /* Make sure we finished attaching before we give
386                            out any references */
387                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388                         if (obd->obd_attached) {
389                                 cfs_read_unlock(&obd_dev_lock);
390                                 return i;
391                         }
392                         break;
393                 }
394         }
395         cfs_read_unlock(&obd_dev_lock);
396
397         return -1;
398 }
399 EXPORT_SYMBOL(class_name2dev);
400
401 struct obd_device *class_name2obd(const char *name)
402 {
403         int dev = class_name2dev(name);
404
405         if (dev < 0 || dev > class_devno_max())
406                 return NULL;
407         return class_num2obd(dev);
408 }
409 EXPORT_SYMBOL(class_name2obd);
410
411 int class_uuid2dev(struct obd_uuid *uuid)
412 {
413         int i;
414
415         cfs_read_lock(&obd_dev_lock);
416         for (i = 0; i < class_devno_max(); i++) {
417                 struct obd_device *obd = class_num2obd(i);
418
419                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421                         cfs_read_unlock(&obd_dev_lock);
422                         return i;
423                 }
424         }
425         cfs_read_unlock(&obd_dev_lock);
426
427         return -1;
428 }
429 EXPORT_SYMBOL(class_uuid2dev);
430
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
432 {
433         int dev = class_uuid2dev(uuid);
434         if (dev < 0)
435                 return NULL;
436         return class_num2obd(dev);
437 }
438 EXPORT_SYMBOL(class_uuid2obd);
439
440 /**
441  * Get obd device from ::obd_devs[]
442  *
443  * \param num [in] array index
444  *
445  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446  *         otherwise return the obd device there.
447  */
448 struct obd_device *class_num2obd(int num)
449 {
450         struct obd_device *obd = NULL;
451
452         if (num < class_devno_max()) {
453                 obd = obd_devs[num];
454                 if (obd == NULL)
455                         return NULL;
456
457                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458                          "%p obd_magic %08x != %08x\n",
459                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460                 LASSERTF(obd->obd_minor == num,
461                          "%p obd_minor %0d != %0d\n",
462                          obd, obd->obd_minor, num);
463         }
464
465         return obd;
466 }
467 EXPORT_SYMBOL(class_num2obd);
468
469 void class_obd_list(void)
470 {
471         char *status;
472         int i;
473
474         cfs_read_lock(&obd_dev_lock);
475         for (i = 0; i < class_devno_max(); i++) {
476                 struct obd_device *obd = class_num2obd(i);
477
478                 if (obd == NULL)
479                         continue;
480                 if (obd->obd_stopping)
481                         status = "ST";
482                 else if (obd->obd_set_up)
483                         status = "UP";
484                 else if (obd->obd_attached)
485                         status = "AT";
486                 else
487                         status = "--";
488                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489                          i, status, obd->obd_type->typ_name,
490                          obd->obd_name, obd->obd_uuid.uuid,
491                          cfs_atomic_read(&obd->obd_refcount));
492         }
493         cfs_read_unlock(&obd_dev_lock);
494         return;
495 }
496
497 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
498    specified, then only the client with that uuid is returned,
499    otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501                                           const char * typ_name,
502                                           struct obd_uuid *grp_uuid)
503 {
504         int i;
505
506         cfs_read_lock(&obd_dev_lock);
507         for (i = 0; i < class_devno_max(); i++) {
508                 struct obd_device *obd = class_num2obd(i);
509
510                 if (obd == NULL)
511                         continue;
512                 if ((strncmp(obd->obd_type->typ_name, typ_name,
513                              strlen(typ_name)) == 0)) {
514                         if (obd_uuid_equals(tgt_uuid,
515                                             &obd->u.cli.cl_target_uuid) &&
516                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
517                                                          &obd->obd_uuid) : 1)) {
518                                 cfs_read_unlock(&obd_dev_lock);
519                                 return obd;
520                         }
521                 }
522         }
523         cfs_read_unlock(&obd_dev_lock);
524
525         return NULL;
526 }
527 EXPORT_SYMBOL(class_find_client_obd);
528
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530    searching at *next, and if a device is found, the next index to look
531    at is saved in *next. If next is NULL, then the first matching device
532    will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
534 {
535         int i;
536
537         if (next == NULL)
538                 i = 0;
539         else if (*next >= 0 && *next < class_devno_max())
540                 i = *next;
541         else
542                 return NULL;
543
544         cfs_read_lock(&obd_dev_lock);
545         for (; i < class_devno_max(); i++) {
546                 struct obd_device *obd = class_num2obd(i);
547
548                 if (obd == NULL)
549                         continue;
550                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
551                         if (next != NULL)
552                                 *next = i+1;
553                         cfs_read_unlock(&obd_dev_lock);
554                         return obd;
555                 }
556         }
557         cfs_read_unlock(&obd_dev_lock);
558
559         return NULL;
560 }
561 EXPORT_SYMBOL(class_devices_in_group);
562
563 /**
564  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565  * adjust sptlrpc settings accordingly.
566  */
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
568 {
569         struct obd_device  *obd;
570         const char         *type;
571         int                 i, rc = 0, rc2;
572
573         LASSERT(namelen > 0);
574
575         cfs_read_lock(&obd_dev_lock);
576         for (i = 0; i < class_devno_max(); i++) {
577                 obd = class_num2obd(i);
578
579                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
580                         continue;
581
582                 /* only notify mdc, osc, mdt, ost */
583                 type = obd->obd_type->typ_name;
584                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587                     strcmp(type, LUSTRE_OST_NAME) != 0)
588                         continue;
589
590                 if (strncmp(obd->obd_name, fsname, namelen))
591                         continue;
592
593                 class_incref(obd, __FUNCTION__, obd);
594                 cfs_read_unlock(&obd_dev_lock);
595                 rc2 = obd_set_info_async(obd->obd_self_export,
596                                          sizeof(KEY_SPTLRPC_CONF),
597                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
598                 rc = rc ? rc : rc2;
599                 class_decref(obd, __FUNCTION__, obd);
600                 cfs_read_lock(&obd_dev_lock);
601         }
602         cfs_read_unlock(&obd_dev_lock);
603         return rc;
604 }
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
606
607 void obd_cleanup_caches(void)
608 {
609         int rc;
610
611         ENTRY;
612         if (obd_device_cachep) {
613                 rc = cfs_mem_cache_destroy(obd_device_cachep);
614                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615                 obd_device_cachep = NULL;
616         }
617         if (obdo_cachep) {
618                 rc = cfs_mem_cache_destroy(obdo_cachep);
619                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
620                 obdo_cachep = NULL;
621         }
622         if (import_cachep) {
623                 rc = cfs_mem_cache_destroy(import_cachep);
624                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625                 import_cachep = NULL;
626         }
627         if (capa_cachep) {
628                 rc = cfs_mem_cache_destroy(capa_cachep);
629                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
630                 capa_cachep = NULL;
631         }
632         EXIT;
633 }
634
635 int obd_init_caches(void)
636 {
637         ENTRY;
638
639         LASSERT(obd_device_cachep == NULL);
640         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641                                                  sizeof(struct obd_device),
642                                                  0, 0);
643         if (!obd_device_cachep)
644                 GOTO(out, -ENOMEM);
645
646         LASSERT(obdo_cachep == NULL);
647         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
648                                            0, 0);
649         if (!obdo_cachep)
650                 GOTO(out, -ENOMEM);
651
652         LASSERT(import_cachep == NULL);
653         import_cachep = cfs_mem_cache_create("ll_import_cache",
654                                              sizeof(struct obd_import),
655                                              0, 0);
656         if (!import_cachep)
657                 GOTO(out, -ENOMEM);
658
659         LASSERT(capa_cachep == NULL);
660         capa_cachep = cfs_mem_cache_create("capa_cache",
661                                            sizeof(struct obd_capa), 0, 0);
662         if (!capa_cachep)
663                 GOTO(out, -ENOMEM);
664
665         RETURN(0);
666  out:
667         obd_cleanup_caches();
668         RETURN(-ENOMEM);
669
670 }
671
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
674 {
675         struct obd_export *export;
676         ENTRY;
677
678         if (!conn) {
679                 CDEBUG(D_CACHE, "looking for null handle\n");
680                 RETURN(NULL);
681         }
682
683         if (conn->cookie == -1) {  /* this means assign a new connection */
684                 CDEBUG(D_CACHE, "want a new connection\n");
685                 RETURN(NULL);
686         }
687
688         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689         export = class_handle2object(conn->cookie);
690         RETURN(export);
691 }
692 EXPORT_SYMBOL(class_conn2export);
693
694 struct obd_device *class_exp2obd(struct obd_export *exp)
695 {
696         if (exp)
697                 return exp->exp_obd;
698         return NULL;
699 }
700 EXPORT_SYMBOL(class_exp2obd);
701
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
703 {
704         struct obd_export *export;
705         export = class_conn2export(conn);
706         if (export) {
707                 struct obd_device *obd = export->exp_obd;
708                 class_export_put(export);
709                 return obd;
710         }
711         return NULL;
712 }
713 EXPORT_SYMBOL(class_conn2obd);
714
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
716 {
717         struct obd_device *obd = exp->exp_obd;
718         if (obd == NULL)
719                 return NULL;
720         return obd->u.cli.cl_import;
721 }
722 EXPORT_SYMBOL(class_exp2cliimp);
723
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
725 {
726         struct obd_device *obd = class_conn2obd(conn);
727         if (obd == NULL)
728                 return NULL;
729         return obd->u.cli.cl_import;
730 }
731 EXPORT_SYMBOL(class_conn2cliimp);
732
733 /* Export management functions */
734 static void class_export_destroy(struct obd_export *exp)
735 {
736         struct obd_device *obd = exp->exp_obd;
737         ENTRY;
738
739         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
740
741         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
742                exp->exp_client_uuid.uuid, obd->obd_name);
743
744         LASSERT(obd != NULL);
745
746         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
747         if (exp->exp_connection)
748                 ptlrpc_put_connection_superhack(exp->exp_connection);
749
750         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
751         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
752         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
753         LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
754         obd_destroy_export(exp);
755         class_decref(obd, "export", exp);
756
757         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
758         EXIT;
759 }
760
761 static void export_handle_addref(void *export)
762 {
763         class_export_get(export);
764 }
765
766 struct obd_export *class_export_get(struct obd_export *exp)
767 {
768         cfs_atomic_inc(&exp->exp_refcount);
769         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
770                cfs_atomic_read(&exp->exp_refcount));
771         return exp;
772 }
773 EXPORT_SYMBOL(class_export_get);
774
775 void class_export_put(struct obd_export *exp)
776 {
777         LASSERT(exp != NULL);
778         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
779         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
780                cfs_atomic_read(&exp->exp_refcount) - 1);
781
782         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
783                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
784                 CDEBUG(D_IOCTL, "final put %p/%s\n",
785                        exp, exp->exp_client_uuid.uuid);
786
787                 /* release nid stat refererence */
788                 lprocfs_exp_cleanup(exp);
789
790                 obd_zombie_export_add(exp);
791         }
792 }
793 EXPORT_SYMBOL(class_export_put);
794
795 /* Creates a new export, adds it to the hash table, and returns a
796  * pointer to it. The refcount is 2: one for the hash reference, and
797  * one for the pointer returned by this function. */
798 struct obd_export *class_new_export(struct obd_device *obd,
799                                     struct obd_uuid *cluuid)
800 {
801         struct obd_export *export;
802         cfs_hash_t *hash = NULL;
803         int rc = 0;
804         ENTRY;
805
806         OBD_ALLOC_PTR(export);
807         if (!export)
808                 return ERR_PTR(-ENOMEM);
809
810         export->exp_conn_cnt = 0;
811         export->exp_lock_hash = NULL;
812         cfs_atomic_set(&export->exp_refcount, 2);
813         cfs_atomic_set(&export->exp_rpc_count, 0);
814         cfs_atomic_set(&export->exp_cb_count, 0);
815         cfs_atomic_set(&export->exp_locks_count, 0);
816 #if LUSTRE_TRACKS_LOCK_EXP_REFS
817         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
818         cfs_spin_lock_init(&export->exp_locks_list_guard);
819 #endif
820         cfs_atomic_set(&export->exp_replay_count, 0);
821         export->exp_obd = obd;
822         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
823         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
824         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
825         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
826         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
827         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
828         class_handle_hash(&export->exp_handle, export_handle_addref);
829         export->exp_last_request_time = cfs_time_current_sec();
830         cfs_spin_lock_init(&export->exp_lock);
831         cfs_spin_lock_init(&export->exp_rpc_lock);
832         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
833         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
834
835         export->exp_sp_peer = LUSTRE_SP_ANY;
836         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
837         export->exp_client_uuid = *cluuid;
838         obd_init_export(export);
839
840         cfs_spin_lock(&obd->obd_dev_lock);
841          /* shouldn't happen, but might race */
842         if (obd->obd_stopping)
843                 GOTO(exit_unlock, rc = -ENODEV);
844
845         hash = cfs_hash_getref(obd->obd_uuid_hash);
846         if (hash == NULL)
847                 GOTO(exit_unlock, rc = -ENODEV);
848         cfs_spin_unlock(&obd->obd_dev_lock);
849
850         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
851                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
852                 if (rc != 0) {
853                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
854                                       obd->obd_name, cluuid->uuid, rc);
855                         GOTO(exit_err, rc = -EALREADY);
856                 }
857         }
858
859         cfs_spin_lock(&obd->obd_dev_lock);
860         if (obd->obd_stopping) {
861                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
862                 GOTO(exit_unlock, rc = -ENODEV);
863         }
864
865         class_incref(obd, "export", export);
866         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
867         cfs_list_add_tail(&export->exp_obd_chain_timed,
868                           &export->exp_obd->obd_exports_timed);
869         export->exp_obd->obd_num_exports++;
870         cfs_spin_unlock(&obd->obd_dev_lock);
871         cfs_hash_putref(hash);
872         RETURN(export);
873
874 exit_unlock:
875         cfs_spin_unlock(&obd->obd_dev_lock);
876 exit_err:
877         if (hash)
878                 cfs_hash_putref(hash);
879         class_handle_unhash(&export->exp_handle);
880         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
881         obd_destroy_export(export);
882         OBD_FREE_PTR(export);
883         return ERR_PTR(rc);
884 }
885 EXPORT_SYMBOL(class_new_export);
886
887 void class_unlink_export(struct obd_export *exp)
888 {
889         class_handle_unhash(&exp->exp_handle);
890
891         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
892         /* delete an uuid-export hashitem from hashtables */
893         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
894                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
895                              &exp->exp_client_uuid,
896                              &exp->exp_uuid_hash);
897
898         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
899         cfs_list_del_init(&exp->exp_obd_chain_timed);
900         exp->exp_obd->obd_num_exports--;
901         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
902         class_export_put(exp);
903 }
904 EXPORT_SYMBOL(class_unlink_export);
905
906 /* Import management functions */
907 void class_import_destroy(struct obd_import *imp)
908 {
909         ENTRY;
910
911         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
912                 imp->imp_obd->obd_name);
913
914         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
915
916         ptlrpc_put_connection_superhack(imp->imp_connection);
917
918         while (!cfs_list_empty(&imp->imp_conn_list)) {
919                 struct obd_import_conn *imp_conn;
920
921                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
922                                           struct obd_import_conn, oic_item);
923                 cfs_list_del_init(&imp_conn->oic_item);
924                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
925                 OBD_FREE(imp_conn, sizeof(*imp_conn));
926         }
927
928         LASSERT(imp->imp_sec == NULL);
929         class_decref(imp->imp_obd, "import", imp);
930         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
931         EXIT;
932 }
933
934 static void import_handle_addref(void *import)
935 {
936         class_import_get(import);
937 }
938
939 struct obd_import *class_import_get(struct obd_import *import)
940 {
941         cfs_atomic_inc(&import->imp_refcount);
942         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
943                cfs_atomic_read(&import->imp_refcount),
944                import->imp_obd->obd_name);
945         return import;
946 }
947 EXPORT_SYMBOL(class_import_get);
948
949 void class_import_put(struct obd_import *imp)
950 {
951         ENTRY;
952
953         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
954         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
955
956         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
957                cfs_atomic_read(&imp->imp_refcount) - 1,
958                imp->imp_obd->obd_name);
959
960         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
961                 CDEBUG(D_INFO, "final put import %p\n", imp);
962                 obd_zombie_import_add(imp);
963         }
964
965         EXIT;
966 }
967 EXPORT_SYMBOL(class_import_put);
968
969 static void init_imp_at(struct imp_at *at) {
970         int i;
971         at_init(&at->iat_net_latency, 0, 0);
972         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
973                 /* max service estimates are tracked on the server side, so
974                    don't use the AT history here, just use the last reported
975                    val. (But keep hist for proc histogram, worst_ever) */
976                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
977                         AT_FLG_NOHIST);
978         }
979 }
980
981 struct obd_import *class_new_import(struct obd_device *obd)
982 {
983         struct obd_import *imp;
984
985         OBD_ALLOC(imp, sizeof(*imp));
986         if (imp == NULL)
987                 return NULL;
988
989         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
990         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
991         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
992         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
993         cfs_spin_lock_init(&imp->imp_lock);
994         imp->imp_last_success_conn = 0;
995         imp->imp_state = LUSTRE_IMP_NEW;
996         imp->imp_obd = class_incref(obd, "import", imp);
997         cfs_sema_init(&imp->imp_sec_mutex, 1);
998         cfs_waitq_init(&imp->imp_recovery_waitq);
999
1000         cfs_atomic_set(&imp->imp_refcount, 2);
1001         cfs_atomic_set(&imp->imp_unregistering, 0);
1002         cfs_atomic_set(&imp->imp_inflight, 0);
1003         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1004         cfs_atomic_set(&imp->imp_inval_count, 0);
1005         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1006         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1007         class_handle_hash(&imp->imp_handle, import_handle_addref);
1008         init_imp_at(&imp->imp_at);
1009
1010         /* the default magic is V2, will be used in connect RPC, and
1011          * then adjusted according to the flags in request/reply. */
1012         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1013
1014         return imp;
1015 }
1016 EXPORT_SYMBOL(class_new_import);
1017
1018 void class_destroy_import(struct obd_import *import)
1019 {
1020         LASSERT(import != NULL);
1021         LASSERT(import != LP_POISON);
1022
1023         class_handle_unhash(&import->imp_handle);
1024
1025         cfs_spin_lock(&import->imp_lock);
1026         import->imp_generation++;
1027         cfs_spin_unlock(&import->imp_lock);
1028         class_import_put(import);
1029 }
1030 EXPORT_SYMBOL(class_destroy_import);
1031
1032 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1033
1034 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1035 {
1036         cfs_spin_lock(&exp->exp_locks_list_guard);
1037
1038         LASSERT(lock->l_exp_refs_nr >= 0);
1039
1040         if (lock->l_exp_refs_target != NULL &&
1041             lock->l_exp_refs_target != exp) {
1042                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1043                               exp, lock, lock->l_exp_refs_target);
1044         }
1045         if ((lock->l_exp_refs_nr ++) == 0) {
1046                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1047                 lock->l_exp_refs_target = exp;
1048         }
1049         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1050                lock, exp, lock->l_exp_refs_nr);
1051         cfs_spin_unlock(&exp->exp_locks_list_guard);
1052 }
1053 EXPORT_SYMBOL(__class_export_add_lock_ref);
1054
1055 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1056 {
1057         cfs_spin_lock(&exp->exp_locks_list_guard);
1058         LASSERT(lock->l_exp_refs_nr > 0);
1059         if (lock->l_exp_refs_target != exp) {
1060                 LCONSOLE_WARN("lock %p, "
1061                               "mismatching export pointers: %p, %p\n",
1062                               lock, lock->l_exp_refs_target, exp);
1063         }
1064         if (-- lock->l_exp_refs_nr == 0) {
1065                 cfs_list_del_init(&lock->l_exp_refs_link);
1066                 lock->l_exp_refs_target = NULL;
1067         }
1068         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1069                lock, exp, lock->l_exp_refs_nr);
1070         cfs_spin_unlock(&exp->exp_locks_list_guard);
1071 }
1072 EXPORT_SYMBOL(__class_export_del_lock_ref);
1073 #endif
1074
1075 /* A connection defines an export context in which preallocation can
1076    be managed. This releases the export pointer reference, and returns
1077    the export handle, so the export refcount is 1 when this function
1078    returns. */
1079 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1080                   struct obd_uuid *cluuid)
1081 {
1082         struct obd_export *export;
1083         LASSERT(conn != NULL);
1084         LASSERT(obd != NULL);
1085         LASSERT(cluuid != NULL);
1086         ENTRY;
1087
1088         export = class_new_export(obd, cluuid);
1089         if (IS_ERR(export))
1090                 RETURN(PTR_ERR(export));
1091
1092         conn->cookie = export->exp_handle.h_cookie;
1093         class_export_put(export);
1094
1095         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1096                cluuid->uuid, conn->cookie);
1097         RETURN(0);
1098 }
1099 EXPORT_SYMBOL(class_connect);
1100
1101 /* if export is involved in recovery then clean up related things */
1102 void class_export_recovery_cleanup(struct obd_export *exp)
1103 {
1104         struct obd_device *obd = exp->exp_obd;
1105
1106         cfs_spin_lock(&obd->obd_recovery_task_lock);
1107         if (exp->exp_delayed)
1108                 obd->obd_delayed_clients--;
1109         if (obd->obd_recovering && exp->exp_in_recovery) {
1110                 cfs_spin_lock(&exp->exp_lock);
1111                 exp->exp_in_recovery = 0;
1112                 cfs_spin_unlock(&exp->exp_lock);
1113                 LASSERT(obd->obd_connected_clients);
1114                 obd->obd_connected_clients--;
1115         }
1116         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1117         /** Cleanup req replay fields */
1118         if (exp->exp_req_replay_needed) {
1119                 cfs_spin_lock(&exp->exp_lock);
1120                 exp->exp_req_replay_needed = 0;
1121                 cfs_spin_unlock(&exp->exp_lock);
1122                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1123                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1124         }
1125         /** Cleanup lock replay data */
1126         if (exp->exp_lock_replay_needed) {
1127                 cfs_spin_lock(&exp->exp_lock);
1128                 exp->exp_lock_replay_needed = 0;
1129                 cfs_spin_unlock(&exp->exp_lock);
1130                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1131                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1132         }
1133 }
1134
1135 /* This function removes 1-3 references from the export:
1136  * 1 - for export pointer passed
1137  * and if disconnect really need
1138  * 2 - removing from hash
1139  * 3 - in client_unlink_export
1140  * The export pointer passed to this function can destroyed */
1141 int class_disconnect(struct obd_export *export)
1142 {
1143         int already_disconnected;
1144         ENTRY;
1145
1146         if (export == NULL) {
1147                 fixme();
1148                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1149                 RETURN(-EINVAL);
1150         }
1151
1152         cfs_spin_lock(&export->exp_lock);
1153         already_disconnected = export->exp_disconnected;
1154         export->exp_disconnected = 1;
1155         cfs_spin_unlock(&export->exp_lock);
1156
1157         /* class_cleanup(), abort_recovery(), and class_fail_export()
1158          * all end up in here, and if any of them race we shouldn't
1159          * call extra class_export_puts(). */
1160         if (already_disconnected) {
1161                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1162                 GOTO(no_disconn, already_disconnected);
1163         }
1164
1165         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1166                export->exp_handle.h_cookie);
1167
1168         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1169                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1170                              &export->exp_connection->c_peer.nid,
1171                              &export->exp_nid_hash);
1172
1173         class_export_recovery_cleanup(export);
1174         class_unlink_export(export);
1175 no_disconn:
1176         class_export_put(export);
1177         RETURN(0);
1178 }
1179 EXPORT_SYMBOL(class_disconnect);
1180
1181 /* Return non-zero for a fully connected export */
1182 int class_connected_export(struct obd_export *exp)
1183 {
1184         if (exp) {
1185                 int connected;
1186                 cfs_spin_lock(&exp->exp_lock);
1187                 connected = (exp->exp_conn_cnt > 0);
1188                 cfs_spin_unlock(&exp->exp_lock);
1189                 return connected;
1190         }
1191         return 0;
1192 }
1193 EXPORT_SYMBOL(class_connected_export);
1194
1195 static void class_disconnect_export_list(cfs_list_t *list,
1196                                          enum obd_option flags)
1197 {
1198         int rc;
1199         struct obd_export *exp;
1200         ENTRY;
1201
1202         /* It's possible that an export may disconnect itself, but
1203          * nothing else will be added to this list. */
1204         while (!cfs_list_empty(list)) {
1205                 exp = cfs_list_entry(list->next, struct obd_export,
1206                                      exp_obd_chain);
1207                 /* need for safe call CDEBUG after obd_disconnect */
1208                 class_export_get(exp);
1209
1210                 cfs_spin_lock(&exp->exp_lock);
1211                 exp->exp_flags = flags;
1212                 cfs_spin_unlock(&exp->exp_lock);
1213
1214                 if (obd_uuid_equals(&exp->exp_client_uuid,
1215                                     &exp->exp_obd->obd_uuid)) {
1216                         CDEBUG(D_HA,
1217                                "exp %p export uuid == obd uuid, don't discon\n",
1218                                exp);
1219                         /* Need to delete this now so we don't end up pointing
1220                          * to work_list later when this export is cleaned up. */
1221                         cfs_list_del_init(&exp->exp_obd_chain);
1222                         class_export_put(exp);
1223                         continue;
1224                 }
1225
1226                 class_export_get(exp);
1227                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1228                        "last request at "CFS_TIME_T"\n",
1229                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1230                        exp, exp->exp_last_request_time);
1231                 /* release one export reference anyway */
1232                 rc = obd_disconnect(exp);
1233
1234                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1235                        obd_export_nid2str(exp), exp, rc);
1236                 class_export_put(exp);
1237         }
1238         EXIT;
1239 }
1240
1241 void class_disconnect_exports(struct obd_device *obd)
1242 {
1243         cfs_list_t work_list;
1244         ENTRY;
1245
1246         /* Move all of the exports from obd_exports to a work list, en masse. */
1247         CFS_INIT_LIST_HEAD(&work_list);
1248         cfs_spin_lock(&obd->obd_dev_lock);
1249         cfs_list_splice_init(&obd->obd_exports, &work_list);
1250         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1251         cfs_spin_unlock(&obd->obd_dev_lock);
1252
1253         if (!cfs_list_empty(&work_list)) {
1254                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1255                        "disconnecting them\n", obd->obd_minor, obd);
1256                 class_disconnect_export_list(&work_list,
1257                                              exp_flags_from_obd(obd));
1258         } else
1259                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1260                        obd->obd_minor, obd);
1261         EXIT;
1262 }
1263 EXPORT_SYMBOL(class_disconnect_exports);
1264
1265 /* Remove exports that have not completed recovery.
1266  */
1267 void class_disconnect_stale_exports(struct obd_device *obd,
1268                                     int (*test_export)(struct obd_export *))
1269 {
1270         cfs_list_t work_list;
1271         cfs_list_t *pos, *n;
1272         struct obd_export *exp;
1273         int evicted = 0;
1274         ENTRY;
1275
1276         CFS_INIT_LIST_HEAD(&work_list);
1277         cfs_spin_lock(&obd->obd_dev_lock);
1278         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1279                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1280                 if (test_export(exp))
1281                         continue;
1282
1283                 /* don't count self-export as client */
1284                 if (obd_uuid_equals(&exp->exp_client_uuid,
1285                                     &exp->exp_obd->obd_uuid))
1286                         continue;
1287
1288                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1289                 evicted++;
1290                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1291                        obd->obd_name, exp->exp_client_uuid.uuid,
1292                        exp->exp_connection == NULL ? "<unknown>" :
1293                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1294                 print_export_data(exp, "EVICTING", 0);
1295         }
1296         cfs_spin_unlock(&obd->obd_dev_lock);
1297
1298         if (evicted) {
1299                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1300                        obd->obd_name, evicted);
1301                 obd->obd_stale_clients += evicted;
1302         }
1303         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1304                                                  OBD_OPT_ABORT_RECOV);
1305         EXIT;
1306 }
1307 EXPORT_SYMBOL(class_disconnect_stale_exports);
1308
1309 void class_fail_export(struct obd_export *exp)
1310 {
1311         int rc, already_failed;
1312
1313         cfs_spin_lock(&exp->exp_lock);
1314         already_failed = exp->exp_failed;
1315         exp->exp_failed = 1;
1316         cfs_spin_unlock(&exp->exp_lock);
1317
1318         if (already_failed) {
1319                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1320                        exp, exp->exp_client_uuid.uuid);
1321                 return;
1322         }
1323
1324         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1325                exp, exp->exp_client_uuid.uuid);
1326
1327         if (obd_dump_on_timeout)
1328                 libcfs_debug_dumplog();
1329
1330         /* Most callers into obd_disconnect are removing their own reference
1331          * (request, for example) in addition to the one from the hash table.
1332          * We don't have such a reference here, so make one. */
1333         class_export_get(exp);
1334         rc = obd_disconnect(exp);
1335         if (rc)
1336                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1337         else
1338                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1339                        exp, exp->exp_client_uuid.uuid);
1340 }
1341 EXPORT_SYMBOL(class_fail_export);
1342
1343 char *obd_export_nid2str(struct obd_export *exp)
1344 {
1345         if (exp->exp_connection != NULL)
1346                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1347
1348         return "(no nid)";
1349 }
1350 EXPORT_SYMBOL(obd_export_nid2str);
1351
1352 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1353 {
1354         struct obd_export *doomed_exp = NULL;
1355         int exports_evicted = 0;
1356
1357         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1358
1359         do {
1360                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1361                 if (doomed_exp == NULL)
1362                         break;
1363
1364                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1365                          "nid %s found, wanted nid %s, requested nid %s\n",
1366                          obd_export_nid2str(doomed_exp),
1367                          libcfs_nid2str(nid_key), nid);
1368                 LASSERTF(doomed_exp != obd->obd_self_export,
1369                          "self-export is hashed by NID?\n");
1370                 exports_evicted++;
1371                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1372                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1373                        exports_evicted);
1374                 class_fail_export(doomed_exp);
1375                 class_export_put(doomed_exp);
1376         } while (1);
1377
1378         if (!exports_evicted)
1379                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1380                        obd->obd_name, nid);
1381         return exports_evicted;
1382 }
1383 EXPORT_SYMBOL(obd_export_evict_by_nid);
1384
1385 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1386 {
1387         struct obd_export *doomed_exp = NULL;
1388         struct obd_uuid doomed_uuid;
1389         int exports_evicted = 0;
1390
1391         obd_str2uuid(&doomed_uuid, uuid);
1392         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1393                 CERROR("%s: can't evict myself\n", obd->obd_name);
1394                 return exports_evicted;
1395         }
1396
1397         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1398
1399         if (doomed_exp == NULL) {
1400                 CERROR("%s: can't disconnect %s: no exports found\n",
1401                        obd->obd_name, uuid);
1402         } else {
1403                 CWARN("%s: evicting %s at adminstrative request\n",
1404                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1405                 class_fail_export(doomed_exp);
1406                 class_export_put(doomed_exp);
1407                 exports_evicted++;
1408         }
1409
1410         return exports_evicted;
1411 }
1412 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1413
1414 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1415 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1416 EXPORT_SYMBOL(class_export_dump_hook);
1417 #endif
1418
1419 static void print_export_data(struct obd_export *exp, const char *status,
1420                               int locks)
1421 {
1422         struct ptlrpc_reply_state *rs;
1423         struct ptlrpc_reply_state *first_reply = NULL;
1424         int nreplies = 0;
1425
1426         cfs_spin_lock(&exp->exp_lock);
1427         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1428                                 rs_exp_list) {
1429                 if (nreplies == 0)
1430                         first_reply = rs;
1431                 nreplies++;
1432         }
1433         cfs_spin_unlock(&exp->exp_lock);
1434
1435         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1436                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1437                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1438                cfs_atomic_read(&exp->exp_rpc_count),
1439                cfs_atomic_read(&exp->exp_cb_count),
1440                cfs_atomic_read(&exp->exp_locks_count),
1441                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1442                nreplies, first_reply, nreplies > 3 ? "..." : "",
1443                exp->exp_last_committed);
1444 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1445         if (locks && class_export_dump_hook != NULL)
1446                 class_export_dump_hook(exp);
1447 #endif
1448 }
1449
1450 void dump_exports(struct obd_device *obd, int locks)
1451 {
1452         struct obd_export *exp;
1453
1454         cfs_spin_lock(&obd->obd_dev_lock);
1455         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1456                 print_export_data(exp, "ACTIVE", locks);
1457         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1458                 print_export_data(exp, "UNLINKED", locks);
1459         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1460                 print_export_data(exp, "DELAYED", locks);
1461         cfs_spin_unlock(&obd->obd_dev_lock);
1462         cfs_spin_lock(&obd_zombie_impexp_lock);
1463         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1464                 print_export_data(exp, "ZOMBIE", locks);
1465         cfs_spin_unlock(&obd_zombie_impexp_lock);
1466 }
1467 EXPORT_SYMBOL(dump_exports);
1468
1469 void obd_exports_barrier(struct obd_device *obd)
1470 {
1471         int waited = 2;
1472         LASSERT(cfs_list_empty(&obd->obd_exports));
1473         cfs_spin_lock(&obd->obd_dev_lock);
1474         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1475                 cfs_spin_unlock(&obd->obd_dev_lock);
1476                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1477                                                    cfs_time_seconds(waited));
1478                 if (waited > 5 && IS_PO2(waited)) {
1479                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1480                                       "more than %d seconds. "
1481                                       "The obd refcount = %d. Is it stuck?\n",
1482                                       obd->obd_name, waited,
1483                                       cfs_atomic_read(&obd->obd_refcount));
1484                         dump_exports(obd, 1);
1485                 }
1486                 waited *= 2;
1487                 cfs_spin_lock(&obd->obd_dev_lock);
1488         }
1489         cfs_spin_unlock(&obd->obd_dev_lock);
1490 }
1491 EXPORT_SYMBOL(obd_exports_barrier);
1492
1493 /* Total amount of zombies to be destroyed */
1494 static int zombies_count = 0;
1495
1496 /**
1497  * kill zombie imports and exports
1498  */
1499 void obd_zombie_impexp_cull(void)
1500 {
1501         struct obd_import *import;
1502         struct obd_export *export;
1503         ENTRY;
1504
1505         do {
1506                 cfs_spin_lock(&obd_zombie_impexp_lock);
1507
1508                 import = NULL;
1509                 if (!cfs_list_empty(&obd_zombie_imports)) {
1510                         import = cfs_list_entry(obd_zombie_imports.next,
1511                                                 struct obd_import,
1512                                                 imp_zombie_chain);
1513                         cfs_list_del_init(&import->imp_zombie_chain);
1514                 }
1515
1516                 export = NULL;
1517                 if (!cfs_list_empty(&obd_zombie_exports)) {
1518                         export = cfs_list_entry(obd_zombie_exports.next,
1519                                                 struct obd_export,
1520                                                 exp_obd_chain);
1521                         cfs_list_del_init(&export->exp_obd_chain);
1522                 }
1523
1524                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1525
1526                 if (import != NULL) {
1527                         class_import_destroy(import);
1528                         cfs_spin_lock(&obd_zombie_impexp_lock);
1529                         zombies_count--;
1530                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1531                 }
1532
1533                 if (export != NULL) {
1534                         class_export_destroy(export);
1535                         cfs_spin_lock(&obd_zombie_impexp_lock);
1536                         zombies_count--;
1537                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1538                 }
1539
1540                 cfs_cond_resched();
1541         } while (import != NULL || export != NULL);
1542         EXIT;
1543 }
1544
1545 static cfs_completion_t         obd_zombie_start;
1546 static cfs_completion_t         obd_zombie_stop;
1547 static unsigned long            obd_zombie_flags;
1548 static cfs_waitq_t              obd_zombie_waitq;
1549 static pid_t                    obd_zombie_pid;
1550
1551 enum {
1552         OBD_ZOMBIE_STOP   = 1 << 1
1553 };
1554
1555 /**
1556  * check for work for kill zombie import/export thread.
1557  */
1558 static int obd_zombie_impexp_check(void *arg)
1559 {
1560         int rc;
1561
1562         cfs_spin_lock(&obd_zombie_impexp_lock);
1563         rc = (zombies_count == 0) &&
1564              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1565         cfs_spin_unlock(&obd_zombie_impexp_lock);
1566
1567         RETURN(rc);
1568 }
1569
1570 /**
1571  * Add export to the obd_zombe thread and notify it.
1572  */
1573 static void obd_zombie_export_add(struct obd_export *exp) {
1574         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1575         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1576         cfs_list_del_init(&exp->exp_obd_chain);
1577         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1578         cfs_spin_lock(&obd_zombie_impexp_lock);
1579         zombies_count++;
1580         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1581         cfs_spin_unlock(&obd_zombie_impexp_lock);
1582
1583         if (obd_zombie_impexp_notify != NULL)
1584                 obd_zombie_impexp_notify();
1585 }
1586
1587 /**
1588  * Add import to the obd_zombe thread and notify it.
1589  */
1590 static void obd_zombie_import_add(struct obd_import *imp) {
1591         LASSERT(imp->imp_sec == NULL);
1592         cfs_spin_lock(&obd_zombie_impexp_lock);
1593         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1594         zombies_count++;
1595         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1596         cfs_spin_unlock(&obd_zombie_impexp_lock);
1597
1598         if (obd_zombie_impexp_notify != NULL)
1599                 obd_zombie_impexp_notify();
1600 }
1601
1602 /**
1603  * notify import/export destroy thread about new zombie.
1604  */
1605 static void obd_zombie_impexp_notify(void)
1606 {
1607         /*
1608          * Make sure obd_zomebie_impexp_thread get this notification.
1609          * It is possible this signal only get by obd_zombie_barrier, and
1610          * barrier gulps this notification and sleeps away and hangs ensues
1611          */
1612         cfs_waitq_broadcast(&obd_zombie_waitq);
1613 }
1614
1615 /**
1616  * check whether obd_zombie is idle
1617  */
1618 static int obd_zombie_is_idle(void)
1619 {
1620         int rc;
1621
1622         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1623         cfs_spin_lock(&obd_zombie_impexp_lock);
1624         rc = (zombies_count == 0);
1625         cfs_spin_unlock(&obd_zombie_impexp_lock);
1626         return rc;
1627 }
1628
1629 /**
1630  * wait when obd_zombie import/export queues become empty
1631  */
1632 void obd_zombie_barrier(void)
1633 {
1634         struct l_wait_info lwi = { 0 };
1635
1636         if (obd_zombie_pid == cfs_curproc_pid())
1637                 /* don't wait for myself */
1638                 return;
1639         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1640 }
1641 EXPORT_SYMBOL(obd_zombie_barrier);
1642
1643 #ifdef __KERNEL__
1644
1645 /**
1646  * destroy zombie export/import thread.
1647  */
1648 static int obd_zombie_impexp_thread(void *unused)
1649 {
1650         int rc;
1651
1652         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1653                 cfs_complete(&obd_zombie_start);
1654                 RETURN(rc);
1655         }
1656
1657         cfs_complete(&obd_zombie_start);
1658
1659         obd_zombie_pid = cfs_curproc_pid();
1660
1661         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1662                 struct l_wait_info lwi = { 0 };
1663
1664                 l_wait_event(obd_zombie_waitq,
1665                              !obd_zombie_impexp_check(NULL), &lwi);
1666                 obd_zombie_impexp_cull();
1667
1668                 /*
1669                  * Notify obd_zombie_barrier callers that queues
1670                  * may be empty.
1671                  */
1672                 cfs_waitq_signal(&obd_zombie_waitq);
1673         }
1674
1675         cfs_complete(&obd_zombie_stop);
1676
1677         RETURN(0);
1678 }
1679
1680 #else /* ! KERNEL */
1681
1682 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1683 static void *obd_zombie_impexp_work_cb;
1684 static void *obd_zombie_impexp_idle_cb;
1685
1686 int obd_zombie_impexp_kill(void *arg)
1687 {
1688         int rc = 0;
1689
1690         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1691                 obd_zombie_impexp_cull();
1692                 rc = 1;
1693         }
1694         cfs_atomic_dec(&zombie_recur);
1695         return rc;
1696 }
1697
1698 #endif
1699
1700 /**
1701  * start destroy zombie import/export thread
1702  */
1703 int obd_zombie_impexp_init(void)
1704 {
1705         int rc;
1706
1707         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1708         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1709         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1710         cfs_init_completion(&obd_zombie_start);
1711         cfs_init_completion(&obd_zombie_stop);
1712         cfs_waitq_init(&obd_zombie_waitq);
1713         obd_zombie_pid = 0;
1714
1715 #ifdef __KERNEL__
1716         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1717         if (rc < 0)
1718                 RETURN(rc);
1719
1720         cfs_wait_for_completion(&obd_zombie_start);
1721 #else
1722
1723         obd_zombie_impexp_work_cb =
1724                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1725                                                  &obd_zombie_impexp_kill, NULL);
1726
1727         obd_zombie_impexp_idle_cb =
1728                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1729                                                  &obd_zombie_impexp_check, NULL);
1730         rc = 0;
1731 #endif
1732         RETURN(rc);
1733 }
1734 /**
1735  * stop destroy zombie import/export thread
1736  */
1737 void obd_zombie_impexp_stop(void)
1738 {
1739         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1740         obd_zombie_impexp_notify();
1741 #ifdef __KERNEL__
1742         cfs_wait_for_completion(&obd_zombie_stop);
1743 #else
1744         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1745         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1746 #endif
1747 }
1748
1749 /***** Kernel-userspace comm helpers *******/
1750
1751 /* Get length of entire message, including header */
1752 int kuc_len(int payload_len)
1753 {
1754         return sizeof(struct kuc_hdr) + payload_len;
1755 }
1756 EXPORT_SYMBOL(kuc_len);
1757
1758 /* Get a pointer to kuc header, given a ptr to the payload
1759  * @param p Pointer to payload area
1760  * @returns Pointer to kuc header
1761  */
1762 struct kuc_hdr * kuc_ptr(void *p)
1763 {
1764         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1765         LASSERT(lh->kuc_magic == KUC_MAGIC);
1766         return lh;
1767 }
1768 EXPORT_SYMBOL(kuc_ptr);
1769
1770 /* Test if payload is part of kuc message
1771  * @param p Pointer to payload area
1772  * @returns boolean
1773  */
1774 int kuc_ispayload(void *p)
1775 {
1776         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1777
1778         if (kh->kuc_magic == KUC_MAGIC)
1779                 return 1;
1780         else
1781                 return 0;
1782 }
1783 EXPORT_SYMBOL(kuc_ispayload);
1784
1785 /* Alloc space for a message, and fill in header
1786  * @return Pointer to payload area
1787  */
1788 void *kuc_alloc(int payload_len, int transport, int type)
1789 {
1790         struct kuc_hdr *lh;
1791         int len = kuc_len(payload_len);
1792
1793         OBD_ALLOC(lh, len);
1794         if (lh == NULL)
1795                 return ERR_PTR(-ENOMEM);
1796
1797         lh->kuc_magic = KUC_MAGIC;
1798         lh->kuc_transport = transport;
1799         lh->kuc_msgtype = type;
1800         lh->kuc_msglen = len;
1801
1802         return (void *)(lh + 1);
1803 }
1804 EXPORT_SYMBOL(kuc_alloc);
1805
1806 /* Takes pointer to payload area */
1807 inline void kuc_free(void *p, int payload_len)
1808 {
1809         struct kuc_hdr *lh = kuc_ptr(p);
1810         OBD_FREE(lh, kuc_len(payload_len));
1811 }
1812 EXPORT_SYMBOL(kuc_free);
1813
1814
1815