Whamcloud - gitweb
91b6262ed2053ace907a369bad18bea4b54d69e9
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!cfs_request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 cfs_try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151 EXPORT_SYMBOL(class_get_type);
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         cfs_module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161 EXPORT_SYMBOL(class_put_type);
162
163 #define CLASS_MAX_NAME 1024
164
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166                         struct lprocfs_vars *vars, const char *name,
167                         struct lu_device_type *ldt)
168 {
169         struct obd_type *type;
170         int rc = 0;
171         ENTRY;
172
173         /* sanity check */
174         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
175
176         if (class_search_type(name)) {
177                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
178                 RETURN(-EEXIST);
179         }
180
181         rc = -ENOMEM;
182         OBD_ALLOC(type, sizeof(*type));
183         if (type == NULL)
184                 RETURN(rc);
185
186         OBD_ALLOC_PTR(type->typ_dt_ops);
187         OBD_ALLOC_PTR(type->typ_md_ops);
188         OBD_ALLOC(type->typ_name, strlen(name) + 1);
189
190         if (type->typ_dt_ops == NULL ||
191             type->typ_md_ops == NULL ||
192             type->typ_name == NULL)
193                 GOTO (failed, rc);
194
195         *(type->typ_dt_ops) = *dt_ops;
196         /* md_ops is optional */
197         if (md_ops)
198                 *(type->typ_md_ops) = *md_ops;
199         strcpy(type->typ_name, name);
200         spin_lock_init(&type->obd_type_lock);
201
202 #ifdef LPROCFS
203         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
204                                               vars, type);
205         if (IS_ERR(type->typ_procroot)) {
206                 rc = PTR_ERR(type->typ_procroot);
207                 type->typ_procroot = NULL;
208                 GOTO (failed, rc);
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         cfs_list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224  failed:
225         if (type->typ_name != NULL)
226                 OBD_FREE(type->typ_name, strlen(name) + 1);
227         if (type->typ_md_ops != NULL)
228                 OBD_FREE_PTR(type->typ_md_ops);
229         if (type->typ_dt_ops != NULL)
230                 OBD_FREE_PTR(type->typ_dt_ops);
231         OBD_FREE(type, sizeof(*type));
232         RETURN(rc);
233 }
234 EXPORT_SYMBOL(class_register_type);
235
236 int class_unregister_type(const char *name)
237 {
238         struct obd_type *type = class_search_type(name);
239         ENTRY;
240
241         if (!type) {
242                 CERROR("unknown obd type\n");
243                 RETURN(-EINVAL);
244         }
245
246         if (type->typ_refcnt) {
247                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248                 /* This is a bad situation, let's make the best of it */
249                 /* Remove ops, but leave the name for debugging */
250                 OBD_FREE_PTR(type->typ_dt_ops);
251                 OBD_FREE_PTR(type->typ_md_ops);
252                 RETURN(-EBUSY);
253         }
254
255         /* we do not use type->typ_procroot as for compatibility purposes
256          * other modules can share names (i.e. lod can use lov entry). so
257          * we can't reference pointer as it can get invalided when another
258          * module removes the entry */
259         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
260
261         if (type->typ_lu)
262                 lu_device_type_fini(type->typ_lu);
263
264         spin_lock(&obd_types_lock);
265         cfs_list_del(&type->typ_chain);
266         spin_unlock(&obd_types_lock);
267         OBD_FREE(type->typ_name, strlen(name) + 1);
268         if (type->typ_dt_ops != NULL)
269                 OBD_FREE_PTR(type->typ_dt_ops);
270         if (type->typ_md_ops != NULL)
271                 OBD_FREE_PTR(type->typ_md_ops);
272         OBD_FREE(type, sizeof(*type));
273         RETURN(0);
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
276
277 /**
278  * Create a new obd device.
279  *
280  * Find an empty slot in ::obd_devs[], create a new obd device in it.
281  *
282  * \param[in] type_name obd device type string.
283  * \param[in] name      obd device name.
284  *
285  * \retval NULL if create fails, otherwise return the obd device
286  *         pointer created.
287  */
288 struct obd_device *class_newdev(const char *type_name, const char *name)
289 {
290         struct obd_device *result = NULL;
291         struct obd_device *newdev;
292         struct obd_type *type = NULL;
293         int i;
294         int new_obd_minor = 0;
295         ENTRY;
296
297         if (strlen(name) >= MAX_OBD_NAME) {
298                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299                 RETURN(ERR_PTR(-EINVAL));
300         }
301
302         type = class_get_type(type_name);
303         if (type == NULL){
304                 CERROR("OBD: unknown type: %s\n", type_name);
305                 RETURN(ERR_PTR(-ENODEV));
306         }
307
308         newdev = obd_device_alloc();
309         if (newdev == NULL)
310                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
311
312         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
313
314         write_lock(&obd_dev_lock);
315         for (i = 0; i < class_devno_max(); i++) {
316                 struct obd_device *obd = class_num2obd(i);
317
318                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
319                         CERROR("Device %s already exists at %d, won't add\n",
320                                name, i);
321                         if (result) {
322                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
323                                          "%p obd_magic %08x != %08x\n", result,
324                                          result->obd_magic, OBD_DEVICE_MAGIC);
325                                 LASSERTF(result->obd_minor == new_obd_minor,
326                                          "%p obd_minor %d != %d\n", result,
327                                          result->obd_minor, new_obd_minor);
328
329                                 obd_devs[result->obd_minor] = NULL;
330                                 result->obd_name[0]='\0';
331                          }
332                         result = ERR_PTR(-EEXIST);
333                         break;
334                 }
335                 if (!result && !obd) {
336                         result = newdev;
337                         result->obd_minor = i;
338                         new_obd_minor = i;
339                         result->obd_type = type;
340                         strncpy(result->obd_name, name,
341                                 sizeof(result->obd_name) - 1);
342                         obd_devs[i] = result;
343                 }
344         }
345         write_unlock(&obd_dev_lock);
346
347         if (result == NULL && i >= class_devno_max()) {
348                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
349                        class_devno_max());
350                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
351         }
352
353         if (IS_ERR(result))
354                 GOTO(out, result);
355
356         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
357                result->obd_name, result);
358
359         RETURN(result);
360 out:
361         obd_device_free(newdev);
362 out_type:
363         class_put_type(type);
364         return result;
365 }
366
367 void class_release_dev(struct obd_device *obd)
368 {
369         struct obd_type *obd_type = obd->obd_type;
370
371         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
372                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
373         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
374                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
375         LASSERT(obd_type != NULL);
376
377         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
378                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
379
380         write_lock(&obd_dev_lock);
381         obd_devs[obd->obd_minor] = NULL;
382         write_unlock(&obd_dev_lock);
383         obd_device_free(obd);
384
385         class_put_type(obd_type);
386 }
387
388 int class_name2dev(const char *name)
389 {
390         int i;
391
392         if (!name)
393                 return -1;
394
395         read_lock(&obd_dev_lock);
396         for (i = 0; i < class_devno_max(); i++) {
397                 struct obd_device *obd = class_num2obd(i);
398
399                 if (obd && strcmp(name, obd->obd_name) == 0) {
400                         /* Make sure we finished attaching before we give
401                            out any references */
402                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
403                         if (obd->obd_attached) {
404                                 read_unlock(&obd_dev_lock);
405                                 return i;
406                         }
407                         break;
408                 }
409         }
410         read_unlock(&obd_dev_lock);
411
412         return -1;
413 }
414 EXPORT_SYMBOL(class_name2dev);
415
416 struct obd_device *class_name2obd(const char *name)
417 {
418         int dev = class_name2dev(name);
419
420         if (dev < 0 || dev > class_devno_max())
421                 return NULL;
422         return class_num2obd(dev);
423 }
424 EXPORT_SYMBOL(class_name2obd);
425
426 int class_uuid2dev(struct obd_uuid *uuid)
427 {
428         int i;
429
430         read_lock(&obd_dev_lock);
431         for (i = 0; i < class_devno_max(); i++) {
432                 struct obd_device *obd = class_num2obd(i);
433
434                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
435                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
436                         read_unlock(&obd_dev_lock);
437                         return i;
438                 }
439         }
440         read_unlock(&obd_dev_lock);
441
442         return -1;
443 }
444 EXPORT_SYMBOL(class_uuid2dev);
445
446 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
447 {
448         int dev = class_uuid2dev(uuid);
449         if (dev < 0)
450                 return NULL;
451         return class_num2obd(dev);
452 }
453 EXPORT_SYMBOL(class_uuid2obd);
454
455 /**
456  * Get obd device from ::obd_devs[]
457  *
458  * \param num [in] array index
459  *
460  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
461  *         otherwise return the obd device there.
462  */
463 struct obd_device *class_num2obd(int num)
464 {
465         struct obd_device *obd = NULL;
466
467         if (num < class_devno_max()) {
468                 obd = obd_devs[num];
469                 if (obd == NULL)
470                         return NULL;
471
472                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
473                          "%p obd_magic %08x != %08x\n",
474                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475                 LASSERTF(obd->obd_minor == num,
476                          "%p obd_minor %0d != %0d\n",
477                          obd, obd->obd_minor, num);
478         }
479
480         return obd;
481 }
482 EXPORT_SYMBOL(class_num2obd);
483
484 /**
485  * Get obd devices count. Device in any
486  *    state are counted
487  * \retval obd device count
488  */
489 int get_devices_count(void)
490 {
491         int index, max_index = class_devno_max(), dev_count = 0;
492
493         read_lock(&obd_dev_lock);
494         for (index = 0; index <= max_index; index++) {
495                 struct obd_device *obd = class_num2obd(index);
496                 if (obd != NULL)
497                         dev_count++;
498         }
499         read_unlock(&obd_dev_lock);
500
501         return dev_count;
502 }
503 EXPORT_SYMBOL(get_devices_count);
504
505 void class_obd_list(void)
506 {
507         char *status;
508         int i;
509
510         read_lock(&obd_dev_lock);
511         for (i = 0; i < class_devno_max(); i++) {
512                 struct obd_device *obd = class_num2obd(i);
513
514                 if (obd == NULL)
515                         continue;
516                 if (obd->obd_stopping)
517                         status = "ST";
518                 else if (obd->obd_set_up)
519                         status = "UP";
520                 else if (obd->obd_attached)
521                         status = "AT";
522                 else
523                         status = "--";
524                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
525                          i, status, obd->obd_type->typ_name,
526                          obd->obd_name, obd->obd_uuid.uuid,
527                          cfs_atomic_read(&obd->obd_refcount));
528         }
529         read_unlock(&obd_dev_lock);
530         return;
531 }
532
533 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
534    specified, then only the client with that uuid is returned,
535    otherwise any client connected to the tgt is returned. */
536 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
537                                           const char * typ_name,
538                                           struct obd_uuid *grp_uuid)
539 {
540         int i;
541
542         read_lock(&obd_dev_lock);
543         for (i = 0; i < class_devno_max(); i++) {
544                 struct obd_device *obd = class_num2obd(i);
545
546                 if (obd == NULL)
547                         continue;
548                 if ((strncmp(obd->obd_type->typ_name, typ_name,
549                              strlen(typ_name)) == 0)) {
550                         if (obd_uuid_equals(tgt_uuid,
551                                             &obd->u.cli.cl_target_uuid) &&
552                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
553                                                          &obd->obd_uuid) : 1)) {
554                                 read_unlock(&obd_dev_lock);
555                                 return obd;
556                         }
557                 }
558         }
559         read_unlock(&obd_dev_lock);
560
561         return NULL;
562 }
563 EXPORT_SYMBOL(class_find_client_obd);
564
565 /* Iterate the obd_device list looking devices have grp_uuid. Start
566    searching at *next, and if a device is found, the next index to look
567    at is saved in *next. If next is NULL, then the first matching device
568    will always be returned. */
569 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
570 {
571         int i;
572
573         if (next == NULL)
574                 i = 0;
575         else if (*next >= 0 && *next < class_devno_max())
576                 i = *next;
577         else
578                 return NULL;
579
580         read_lock(&obd_dev_lock);
581         for (; i < class_devno_max(); i++) {
582                 struct obd_device *obd = class_num2obd(i);
583
584                 if (obd == NULL)
585                         continue;
586                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
587                         if (next != NULL)
588                                 *next = i+1;
589                         read_unlock(&obd_dev_lock);
590                         return obd;
591                 }
592         }
593         read_unlock(&obd_dev_lock);
594
595         return NULL;
596 }
597 EXPORT_SYMBOL(class_devices_in_group);
598
599 /**
600  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
601  * adjust sptlrpc settings accordingly.
602  */
603 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
604 {
605         struct obd_device  *obd;
606         const char         *type;
607         int                 i, rc = 0, rc2;
608
609         LASSERT(namelen > 0);
610
611         read_lock(&obd_dev_lock);
612         for (i = 0; i < class_devno_max(); i++) {
613                 obd = class_num2obd(i);
614
615                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
616                         continue;
617
618                 /* only notify mdc, osc, mdt, ost */
619                 type = obd->obd_type->typ_name;
620                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
621                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
622                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
623                     strcmp(type, LUSTRE_OST_NAME) != 0)
624                         continue;
625
626                 if (strncmp(obd->obd_name, fsname, namelen))
627                         continue;
628
629                 class_incref(obd, __FUNCTION__, obd);
630                 read_unlock(&obd_dev_lock);
631                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
632                                          sizeof(KEY_SPTLRPC_CONF),
633                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
634                 rc = rc ? rc : rc2;
635                 class_decref(obd, __FUNCTION__, obd);
636                 read_lock(&obd_dev_lock);
637         }
638         read_unlock(&obd_dev_lock);
639         return rc;
640 }
641 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
642
643 void obd_cleanup_caches(void)
644 {
645         ENTRY;
646         if (obd_device_cachep) {
647                 kmem_cache_destroy(obd_device_cachep);
648                 obd_device_cachep = NULL;
649         }
650         if (obdo_cachep) {
651                 kmem_cache_destroy(obdo_cachep);
652                 obdo_cachep = NULL;
653         }
654         if (import_cachep) {
655                 kmem_cache_destroy(import_cachep);
656                 import_cachep = NULL;
657         }
658         if (capa_cachep) {
659                 kmem_cache_destroy(capa_cachep);
660                 capa_cachep = NULL;
661         }
662         EXIT;
663 }
664
665 int obd_init_caches(void)
666 {
667         ENTRY;
668
669         LASSERT(obd_device_cachep == NULL);
670         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
671                                               sizeof(struct obd_device),
672                                               0, 0, NULL);
673         if (!obd_device_cachep)
674                 GOTO(out, -ENOMEM);
675
676         LASSERT(obdo_cachep == NULL);
677         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
678                                         0, 0, NULL);
679         if (!obdo_cachep)
680                 GOTO(out, -ENOMEM);
681
682         LASSERT(import_cachep == NULL);
683         import_cachep = kmem_cache_create("ll_import_cache",
684                                           sizeof(struct obd_import),
685                                           0, 0, NULL);
686         if (!import_cachep)
687                 GOTO(out, -ENOMEM);
688
689         LASSERT(capa_cachep == NULL);
690         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
691                                         0, 0, NULL);
692         if (!capa_cachep)
693                 GOTO(out, -ENOMEM);
694
695         RETURN(0);
696 out:
697         obd_cleanup_caches();
698         RETURN(-ENOMEM);
699
700 }
701
702 /* map connection to client */
703 struct obd_export *class_conn2export(struct lustre_handle *conn)
704 {
705         struct obd_export *export;
706         ENTRY;
707
708         if (!conn) {
709                 CDEBUG(D_CACHE, "looking for null handle\n");
710                 RETURN(NULL);
711         }
712
713         if (conn->cookie == -1) {  /* this means assign a new connection */
714                 CDEBUG(D_CACHE, "want a new connection\n");
715                 RETURN(NULL);
716         }
717
718         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
719         export = class_handle2object(conn->cookie, NULL);
720         RETURN(export);
721 }
722 EXPORT_SYMBOL(class_conn2export);
723
724 struct obd_device *class_exp2obd(struct obd_export *exp)
725 {
726         if (exp)
727                 return exp->exp_obd;
728         return NULL;
729 }
730 EXPORT_SYMBOL(class_exp2obd);
731
732 struct obd_device *class_conn2obd(struct lustre_handle *conn)
733 {
734         struct obd_export *export;
735         export = class_conn2export(conn);
736         if (export) {
737                 struct obd_device *obd = export->exp_obd;
738                 class_export_put(export);
739                 return obd;
740         }
741         return NULL;
742 }
743 EXPORT_SYMBOL(class_conn2obd);
744
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
746 {
747         struct obd_device *obd = exp->exp_obd;
748         if (obd == NULL)
749                 return NULL;
750         return obd->u.cli.cl_import;
751 }
752 EXPORT_SYMBOL(class_exp2cliimp);
753
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
755 {
756         struct obd_device *obd = class_conn2obd(conn);
757         if (obd == NULL)
758                 return NULL;
759         return obd->u.cli.cl_import;
760 }
761 EXPORT_SYMBOL(class_conn2cliimp);
762
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
765 {
766         struct obd_device *obd = exp->exp_obd;
767         ENTRY;
768
769         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770         LASSERT(obd != NULL);
771
772         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773                exp->exp_client_uuid.uuid, obd->obd_name);
774
775         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776         if (exp->exp_connection)
777                 ptlrpc_put_connection_superhack(exp->exp_connection);
778
779         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
780         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
781         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
782         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
783         obd_destroy_export(exp);
784         class_decref(obd, "export", exp);
785
786         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
787         EXIT;
788 }
789
790 static void export_handle_addref(void *export)
791 {
792         class_export_get(export);
793 }
794
795 static struct portals_handle_ops export_handle_ops = {
796         .hop_addref = export_handle_addref,
797         .hop_free   = NULL,
798 };
799
800 struct obd_export *class_export_get(struct obd_export *exp)
801 {
802         cfs_atomic_inc(&exp->exp_refcount);
803         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804                cfs_atomic_read(&exp->exp_refcount));
805         return exp;
806 }
807 EXPORT_SYMBOL(class_export_get);
808
809 void class_export_put(struct obd_export *exp)
810 {
811         LASSERT(exp != NULL);
812         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814                cfs_atomic_read(&exp->exp_refcount) - 1);
815
816         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
817                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
818                 CDEBUG(D_IOCTL, "final put %p/%s\n",
819                        exp, exp->exp_client_uuid.uuid);
820
821                 /* release nid stat refererence */
822                 lprocfs_exp_cleanup(exp);
823
824                 obd_zombie_export_add(exp);
825         }
826 }
827 EXPORT_SYMBOL(class_export_put);
828
829 /* Creates a new export, adds it to the hash table, and returns a
830  * pointer to it. The refcount is 2: one for the hash reference, and
831  * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833                                     struct obd_uuid *cluuid)
834 {
835         struct obd_export *export;
836         cfs_hash_t *hash = NULL;
837         int rc = 0;
838         ENTRY;
839
840         OBD_ALLOC_PTR(export);
841         if (!export)
842                 return ERR_PTR(-ENOMEM);
843
844         export->exp_conn_cnt = 0;
845         export->exp_lock_hash = NULL;
846         export->exp_flock_hash = NULL;
847         cfs_atomic_set(&export->exp_refcount, 2);
848         cfs_atomic_set(&export->exp_rpc_count, 0);
849         cfs_atomic_set(&export->exp_cb_count, 0);
850         cfs_atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
853         spin_lock_init(&export->exp_locks_list_guard);
854 #endif
855         cfs_atomic_set(&export->exp_replay_count, 0);
856         export->exp_obd = obd;
857         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
858         spin_lock_init(&export->exp_uncommitted_replies_lock);
859         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
861         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
862         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
863         class_handle_hash(&export->exp_handle, &export_handle_ops);
864         export->exp_last_request_time = cfs_time_current_sec();
865         spin_lock_init(&export->exp_lock);
866         spin_lock_init(&export->exp_rpc_lock);
867         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
868         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
869         spin_lock_init(&export->exp_bl_list_lock);
870         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
871
872         export->exp_sp_peer = LUSTRE_SP_ANY;
873         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
874         export->exp_client_uuid = *cluuid;
875         obd_init_export(export);
876
877         spin_lock(&obd->obd_dev_lock);
878         /* shouldn't happen, but might race */
879         if (obd->obd_stopping)
880                 GOTO(exit_unlock, rc = -ENODEV);
881
882         hash = cfs_hash_getref(obd->obd_uuid_hash);
883         if (hash == NULL)
884                 GOTO(exit_unlock, rc = -ENODEV);
885         spin_unlock(&obd->obd_dev_lock);
886
887         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
888                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
889                 if (rc != 0) {
890                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
891                                       obd->obd_name, cluuid->uuid, rc);
892                         GOTO(exit_err, rc = -EALREADY);
893                 }
894         }
895
896         spin_lock(&obd->obd_dev_lock);
897         if (obd->obd_stopping) {
898                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
899                 GOTO(exit_unlock, rc = -ENODEV);
900         }
901
902         class_incref(obd, "export", export);
903         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
904         cfs_list_add_tail(&export->exp_obd_chain_timed,
905                           &export->exp_obd->obd_exports_timed);
906         export->exp_obd->obd_num_exports++;
907         spin_unlock(&obd->obd_dev_lock);
908         cfs_hash_putref(hash);
909         RETURN(export);
910
911 exit_unlock:
912         spin_unlock(&obd->obd_dev_lock);
913 exit_err:
914         if (hash)
915                 cfs_hash_putref(hash);
916         class_handle_unhash(&export->exp_handle);
917         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
918         obd_destroy_export(export);
919         OBD_FREE_PTR(export);
920         return ERR_PTR(rc);
921 }
922 EXPORT_SYMBOL(class_new_export);
923
924 void class_unlink_export(struct obd_export *exp)
925 {
926         class_handle_unhash(&exp->exp_handle);
927
928         spin_lock(&exp->exp_obd->obd_dev_lock);
929         /* delete an uuid-export hashitem from hashtables */
930         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
931                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
932                              &exp->exp_client_uuid,
933                              &exp->exp_uuid_hash);
934
935         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
936         cfs_list_del_init(&exp->exp_obd_chain_timed);
937         exp->exp_obd->obd_num_exports--;
938         spin_unlock(&exp->exp_obd->obd_dev_lock);
939         class_export_put(exp);
940 }
941 EXPORT_SYMBOL(class_unlink_export);
942
943 /* Import management functions */
944 void class_import_destroy(struct obd_import *imp)
945 {
946         ENTRY;
947
948         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
949                 imp->imp_obd->obd_name);
950
951         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
952
953         ptlrpc_put_connection_superhack(imp->imp_connection);
954
955         while (!cfs_list_empty(&imp->imp_conn_list)) {
956                 struct obd_import_conn *imp_conn;
957
958                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
959                                           struct obd_import_conn, oic_item);
960                 cfs_list_del_init(&imp_conn->oic_item);
961                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
962                 OBD_FREE(imp_conn, sizeof(*imp_conn));
963         }
964
965         LASSERT(imp->imp_sec == NULL);
966         class_decref(imp->imp_obd, "import", imp);
967         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
968         EXIT;
969 }
970
971 static void import_handle_addref(void *import)
972 {
973         class_import_get(import);
974 }
975
976 static struct portals_handle_ops import_handle_ops = {
977         .hop_addref = import_handle_addref,
978         .hop_free   = NULL,
979 };
980
981 struct obd_import *class_import_get(struct obd_import *import)
982 {
983         cfs_atomic_inc(&import->imp_refcount);
984         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
985                cfs_atomic_read(&import->imp_refcount),
986                import->imp_obd->obd_name);
987         return import;
988 }
989 EXPORT_SYMBOL(class_import_get);
990
991 void class_import_put(struct obd_import *imp)
992 {
993         ENTRY;
994
995         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
996         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
997
998         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
999                cfs_atomic_read(&imp->imp_refcount) - 1,
1000                imp->imp_obd->obd_name);
1001
1002         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1003                 CDEBUG(D_INFO, "final put import %p\n", imp);
1004                 obd_zombie_import_add(imp);
1005         }
1006
1007         /* catch possible import put race */
1008         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1009         EXIT;
1010 }
1011 EXPORT_SYMBOL(class_import_put);
1012
1013 static void init_imp_at(struct imp_at *at) {
1014         int i;
1015         at_init(&at->iat_net_latency, 0, 0);
1016         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1017                 /* max service estimates are tracked on the server side, so
1018                    don't use the AT history here, just use the last reported
1019                    val. (But keep hist for proc histogram, worst_ever) */
1020                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1021                         AT_FLG_NOHIST);
1022         }
1023 }
1024
1025 struct obd_import *class_new_import(struct obd_device *obd)
1026 {
1027         struct obd_import *imp;
1028
1029         OBD_ALLOC(imp, sizeof(*imp));
1030         if (imp == NULL)
1031                 return NULL;
1032
1033         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1034         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1035         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1036         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1037         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1038         spin_lock_init(&imp->imp_lock);
1039         imp->imp_last_success_conn = 0;
1040         imp->imp_state = LUSTRE_IMP_NEW;
1041         imp->imp_obd = class_incref(obd, "import", imp);
1042         mutex_init(&imp->imp_sec_mutex);
1043         cfs_waitq_init(&imp->imp_recovery_waitq);
1044
1045         cfs_atomic_set(&imp->imp_refcount, 2);
1046         cfs_atomic_set(&imp->imp_unregistering, 0);
1047         cfs_atomic_set(&imp->imp_inflight, 0);
1048         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1049         cfs_atomic_set(&imp->imp_inval_count, 0);
1050         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1051         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1052         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1053         init_imp_at(&imp->imp_at);
1054
1055         /* the default magic is V2, will be used in connect RPC, and
1056          * then adjusted according to the flags in request/reply. */
1057         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1058
1059         return imp;
1060 }
1061 EXPORT_SYMBOL(class_new_import);
1062
1063 void class_destroy_import(struct obd_import *import)
1064 {
1065         LASSERT(import != NULL);
1066         LASSERT(import != LP_POISON);
1067
1068         class_handle_unhash(&import->imp_handle);
1069
1070         spin_lock(&import->imp_lock);
1071         import->imp_generation++;
1072         spin_unlock(&import->imp_lock);
1073         class_import_put(import);
1074 }
1075 EXPORT_SYMBOL(class_destroy_import);
1076
1077 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1078
1079 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1080 {
1081         spin_lock(&exp->exp_locks_list_guard);
1082
1083         LASSERT(lock->l_exp_refs_nr >= 0);
1084
1085         if (lock->l_exp_refs_target != NULL &&
1086             lock->l_exp_refs_target != exp) {
1087                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1088                               exp, lock, lock->l_exp_refs_target);
1089         }
1090         if ((lock->l_exp_refs_nr ++) == 0) {
1091                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1092                 lock->l_exp_refs_target = exp;
1093         }
1094         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1095                lock, exp, lock->l_exp_refs_nr);
1096         spin_unlock(&exp->exp_locks_list_guard);
1097 }
1098 EXPORT_SYMBOL(__class_export_add_lock_ref);
1099
1100 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1101 {
1102         spin_lock(&exp->exp_locks_list_guard);
1103         LASSERT(lock->l_exp_refs_nr > 0);
1104         if (lock->l_exp_refs_target != exp) {
1105                 LCONSOLE_WARN("lock %p, "
1106                               "mismatching export pointers: %p, %p\n",
1107                               lock, lock->l_exp_refs_target, exp);
1108         }
1109         if (-- lock->l_exp_refs_nr == 0) {
1110                 cfs_list_del_init(&lock->l_exp_refs_link);
1111                 lock->l_exp_refs_target = NULL;
1112         }
1113         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1114                lock, exp, lock->l_exp_refs_nr);
1115         spin_unlock(&exp->exp_locks_list_guard);
1116 }
1117 EXPORT_SYMBOL(__class_export_del_lock_ref);
1118 #endif
1119
1120 /* A connection defines an export context in which preallocation can
1121    be managed. This releases the export pointer reference, and returns
1122    the export handle, so the export refcount is 1 when this function
1123    returns. */
1124 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1125                   struct obd_uuid *cluuid)
1126 {
1127         struct obd_export *export;
1128         LASSERT(conn != NULL);
1129         LASSERT(obd != NULL);
1130         LASSERT(cluuid != NULL);
1131         ENTRY;
1132
1133         export = class_new_export(obd, cluuid);
1134         if (IS_ERR(export))
1135                 RETURN(PTR_ERR(export));
1136
1137         conn->cookie = export->exp_handle.h_cookie;
1138         class_export_put(export);
1139
1140         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1141                cluuid->uuid, conn->cookie);
1142         RETURN(0);
1143 }
1144 EXPORT_SYMBOL(class_connect);
1145
1146 /* if export is involved in recovery then clean up related things */
1147 void class_export_recovery_cleanup(struct obd_export *exp)
1148 {
1149         struct obd_device *obd = exp->exp_obd;
1150
1151         spin_lock(&obd->obd_recovery_task_lock);
1152         if (exp->exp_delayed)
1153                 obd->obd_delayed_clients--;
1154         if (obd->obd_recovering) {
1155                 if (exp->exp_in_recovery) {
1156                         spin_lock(&exp->exp_lock);
1157                         exp->exp_in_recovery = 0;
1158                         spin_unlock(&exp->exp_lock);
1159                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1160                         cfs_atomic_dec(&obd->obd_connected_clients);
1161                 }
1162
1163                 /* if called during recovery then should update
1164                  * obd_stale_clients counter,
1165                  * lightweight exports are not counted */
1166                 if (exp->exp_failed &&
1167                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1168                         exp->exp_obd->obd_stale_clients++;
1169         }
1170         spin_unlock(&obd->obd_recovery_task_lock);
1171         /** Cleanup req replay fields */
1172         if (exp->exp_req_replay_needed) {
1173                 spin_lock(&exp->exp_lock);
1174                 exp->exp_req_replay_needed = 0;
1175                 spin_unlock(&exp->exp_lock);
1176                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1177                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1178         }
1179         /** Cleanup lock replay data */
1180         if (exp->exp_lock_replay_needed) {
1181                 spin_lock(&exp->exp_lock);
1182                 exp->exp_lock_replay_needed = 0;
1183                 spin_unlock(&exp->exp_lock);
1184                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1185                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1186         }
1187 }
1188
1189 /* This function removes 1-3 references from the export:
1190  * 1 - for export pointer passed
1191  * and if disconnect really need
1192  * 2 - removing from hash
1193  * 3 - in client_unlink_export
1194  * The export pointer passed to this function can destroyed */
1195 int class_disconnect(struct obd_export *export)
1196 {
1197         int already_disconnected;
1198         ENTRY;
1199
1200         if (export == NULL) {
1201                 CWARN("attempting to free NULL export %p\n", export);
1202                 RETURN(-EINVAL);
1203         }
1204
1205         spin_lock(&export->exp_lock);
1206         already_disconnected = export->exp_disconnected;
1207         export->exp_disconnected = 1;
1208         spin_unlock(&export->exp_lock);
1209
1210         /* class_cleanup(), abort_recovery(), and class_fail_export()
1211          * all end up in here, and if any of them race we shouldn't
1212          * call extra class_export_puts(). */
1213         if (already_disconnected) {
1214                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1215                 GOTO(no_disconn, already_disconnected);
1216         }
1217
1218         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1219                export->exp_handle.h_cookie);
1220
1221         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1222                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1223                              &export->exp_connection->c_peer.nid,
1224                              &export->exp_nid_hash);
1225
1226         class_export_recovery_cleanup(export);
1227         class_unlink_export(export);
1228 no_disconn:
1229         class_export_put(export);
1230         RETURN(0);
1231 }
1232 EXPORT_SYMBOL(class_disconnect);
1233
1234 /* Return non-zero for a fully connected export */
1235 int class_connected_export(struct obd_export *exp)
1236 {
1237         if (exp) {
1238                 int connected;
1239                 spin_lock(&exp->exp_lock);
1240                 connected = (exp->exp_conn_cnt > 0);
1241                 spin_unlock(&exp->exp_lock);
1242                 return connected;
1243         }
1244         return 0;
1245 }
1246 EXPORT_SYMBOL(class_connected_export);
1247
1248 static void class_disconnect_export_list(cfs_list_t *list,
1249                                          enum obd_option flags)
1250 {
1251         int rc;
1252         struct obd_export *exp;
1253         ENTRY;
1254
1255         /* It's possible that an export may disconnect itself, but
1256          * nothing else will be added to this list. */
1257         while (!cfs_list_empty(list)) {
1258                 exp = cfs_list_entry(list->next, struct obd_export,
1259                                      exp_obd_chain);
1260                 /* need for safe call CDEBUG after obd_disconnect */
1261                 class_export_get(exp);
1262
1263                 spin_lock(&exp->exp_lock);
1264                 exp->exp_flags = flags;
1265                 spin_unlock(&exp->exp_lock);
1266
1267                 if (obd_uuid_equals(&exp->exp_client_uuid,
1268                                     &exp->exp_obd->obd_uuid)) {
1269                         CDEBUG(D_HA,
1270                                "exp %p export uuid == obd uuid, don't discon\n",
1271                                exp);
1272                         /* Need to delete this now so we don't end up pointing
1273                          * to work_list later when this export is cleaned up. */
1274                         cfs_list_del_init(&exp->exp_obd_chain);
1275                         class_export_put(exp);
1276                         continue;
1277                 }
1278
1279                 class_export_get(exp);
1280                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1281                        "last request at "CFS_TIME_T"\n",
1282                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1283                        exp, exp->exp_last_request_time);
1284                 /* release one export reference anyway */
1285                 rc = obd_disconnect(exp);
1286
1287                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1288                        obd_export_nid2str(exp), exp, rc);
1289                 class_export_put(exp);
1290         }
1291         EXIT;
1292 }
1293
1294 void class_disconnect_exports(struct obd_device *obd)
1295 {
1296         cfs_list_t work_list;
1297         ENTRY;
1298
1299         /* Move all of the exports from obd_exports to a work list, en masse. */
1300         CFS_INIT_LIST_HEAD(&work_list);
1301         spin_lock(&obd->obd_dev_lock);
1302         cfs_list_splice_init(&obd->obd_exports, &work_list);
1303         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1304         spin_unlock(&obd->obd_dev_lock);
1305
1306         if (!cfs_list_empty(&work_list)) {
1307                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1308                        "disconnecting them\n", obd->obd_minor, obd);
1309                 class_disconnect_export_list(&work_list,
1310                                              exp_flags_from_obd(obd));
1311         } else
1312                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1313                        obd->obd_minor, obd);
1314         EXIT;
1315 }
1316 EXPORT_SYMBOL(class_disconnect_exports);
1317
1318 /* Remove exports that have not completed recovery.
1319  */
1320 void class_disconnect_stale_exports(struct obd_device *obd,
1321                                     int (*test_export)(struct obd_export *))
1322 {
1323         cfs_list_t work_list;
1324         struct obd_export *exp, *n;
1325         int evicted = 0;
1326         ENTRY;
1327
1328         CFS_INIT_LIST_HEAD(&work_list);
1329         spin_lock(&obd->obd_dev_lock);
1330         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1331                                      exp_obd_chain) {
1332                 /* don't count self-export as client */
1333                 if (obd_uuid_equals(&exp->exp_client_uuid,
1334                                     &exp->exp_obd->obd_uuid))
1335                         continue;
1336
1337                 /* don't evict clients which have no slot in last_rcvd
1338                  * (e.g. lightweight connection) */
1339                 if (exp->exp_target_data.ted_lr_idx == -1)
1340                         continue;
1341
1342                 spin_lock(&exp->exp_lock);
1343                 if (exp->exp_failed || test_export(exp)) {
1344                         spin_unlock(&exp->exp_lock);
1345                         continue;
1346                 }
1347                 exp->exp_failed = 1;
1348                 spin_unlock(&exp->exp_lock);
1349
1350                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1351                 evicted++;
1352                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1353                        obd->obd_name, exp->exp_client_uuid.uuid,
1354                        exp->exp_connection == NULL ? "<unknown>" :
1355                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1356                 print_export_data(exp, "EVICTING", 0);
1357         }
1358         spin_unlock(&obd->obd_dev_lock);
1359
1360         if (evicted)
1361                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1362                               obd->obd_name, evicted);
1363
1364         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1365                                                  OBD_OPT_ABORT_RECOV);
1366         EXIT;
1367 }
1368 EXPORT_SYMBOL(class_disconnect_stale_exports);
1369
1370 void class_fail_export(struct obd_export *exp)
1371 {
1372         int rc, already_failed;
1373
1374         spin_lock(&exp->exp_lock);
1375         already_failed = exp->exp_failed;
1376         exp->exp_failed = 1;
1377         spin_unlock(&exp->exp_lock);
1378
1379         if (already_failed) {
1380                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1381                        exp, exp->exp_client_uuid.uuid);
1382                 return;
1383         }
1384
1385         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1386                exp, exp->exp_client_uuid.uuid);
1387
1388         if (obd_dump_on_timeout)
1389                 libcfs_debug_dumplog();
1390
1391         /* need for safe call CDEBUG after obd_disconnect */
1392         class_export_get(exp);
1393
1394         /* Most callers into obd_disconnect are removing their own reference
1395          * (request, for example) in addition to the one from the hash table.
1396          * We don't have such a reference here, so make one. */
1397         class_export_get(exp);
1398         rc = obd_disconnect(exp);
1399         if (rc)
1400                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1401         else
1402                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1403                        exp, exp->exp_client_uuid.uuid);
1404         class_export_put(exp);
1405 }
1406 EXPORT_SYMBOL(class_fail_export);
1407
1408 char *obd_export_nid2str(struct obd_export *exp)
1409 {
1410         if (exp->exp_connection != NULL)
1411                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1412
1413         return "(no nid)";
1414 }
1415 EXPORT_SYMBOL(obd_export_nid2str);
1416
1417 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1418 {
1419         cfs_hash_t *nid_hash;
1420         struct obd_export *doomed_exp = NULL;
1421         int exports_evicted = 0;
1422
1423         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1424
1425         spin_lock(&obd->obd_dev_lock);
1426         /* umount has run already, so evict thread should leave
1427          * its task to umount thread now */
1428         if (obd->obd_stopping) {
1429                 spin_unlock(&obd->obd_dev_lock);
1430                 return exports_evicted;
1431         }
1432         nid_hash = obd->obd_nid_hash;
1433         cfs_hash_getref(nid_hash);
1434         spin_unlock(&obd->obd_dev_lock);
1435
1436         do {
1437                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1438                 if (doomed_exp == NULL)
1439                         break;
1440
1441                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1442                          "nid %s found, wanted nid %s, requested nid %s\n",
1443                          obd_export_nid2str(doomed_exp),
1444                          libcfs_nid2str(nid_key), nid);
1445                 LASSERTF(doomed_exp != obd->obd_self_export,
1446                          "self-export is hashed by NID?\n");
1447                 exports_evicted++;
1448                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1449                               "request\n", obd->obd_name,
1450                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1451                               obd_export_nid2str(doomed_exp));
1452                 class_fail_export(doomed_exp);
1453                 class_export_put(doomed_exp);
1454         } while (1);
1455
1456         cfs_hash_putref(nid_hash);
1457
1458         if (!exports_evicted)
1459                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1460                        obd->obd_name, nid);
1461         return exports_evicted;
1462 }
1463 EXPORT_SYMBOL(obd_export_evict_by_nid);
1464
1465 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1466 {
1467         cfs_hash_t *uuid_hash;
1468         struct obd_export *doomed_exp = NULL;
1469         struct obd_uuid doomed_uuid;
1470         int exports_evicted = 0;
1471
1472         spin_lock(&obd->obd_dev_lock);
1473         if (obd->obd_stopping) {
1474                 spin_unlock(&obd->obd_dev_lock);
1475                 return exports_evicted;
1476         }
1477         uuid_hash = obd->obd_uuid_hash;
1478         cfs_hash_getref(uuid_hash);
1479         spin_unlock(&obd->obd_dev_lock);
1480
1481         obd_str2uuid(&doomed_uuid, uuid);
1482         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1483                 CERROR("%s: can't evict myself\n", obd->obd_name);
1484                 cfs_hash_putref(uuid_hash);
1485                 return exports_evicted;
1486         }
1487
1488         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1489
1490         if (doomed_exp == NULL) {
1491                 CERROR("%s: can't disconnect %s: no exports found\n",
1492                        obd->obd_name, uuid);
1493         } else {
1494                 CWARN("%s: evicting %s at adminstrative request\n",
1495                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1496                 class_fail_export(doomed_exp);
1497                 class_export_put(doomed_exp);
1498                 exports_evicted++;
1499         }
1500         cfs_hash_putref(uuid_hash);
1501
1502         return exports_evicted;
1503 }
1504 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1505
1506 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1507 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1508 EXPORT_SYMBOL(class_export_dump_hook);
1509 #endif
1510
1511 static void print_export_data(struct obd_export *exp, const char *status,
1512                               int locks)
1513 {
1514         struct ptlrpc_reply_state *rs;
1515         struct ptlrpc_reply_state *first_reply = NULL;
1516         int nreplies = 0;
1517
1518         spin_lock(&exp->exp_lock);
1519         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1520                                 rs_exp_list) {
1521                 if (nreplies == 0)
1522                         first_reply = rs;
1523                 nreplies++;
1524         }
1525         spin_unlock(&exp->exp_lock);
1526
1527         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1528                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1529                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1530                cfs_atomic_read(&exp->exp_rpc_count),
1531                cfs_atomic_read(&exp->exp_cb_count),
1532                cfs_atomic_read(&exp->exp_locks_count),
1533                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1534                nreplies, first_reply, nreplies > 3 ? "..." : "",
1535                exp->exp_last_committed);
1536 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1537         if (locks && class_export_dump_hook != NULL)
1538                 class_export_dump_hook(exp);
1539 #endif
1540 }
1541
1542 void dump_exports(struct obd_device *obd, int locks)
1543 {
1544         struct obd_export *exp;
1545
1546         spin_lock(&obd->obd_dev_lock);
1547         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1548                 print_export_data(exp, "ACTIVE", locks);
1549         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1550                 print_export_data(exp, "UNLINKED", locks);
1551         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1552                 print_export_data(exp, "DELAYED", locks);
1553         spin_unlock(&obd->obd_dev_lock);
1554         spin_lock(&obd_zombie_impexp_lock);
1555         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1556                 print_export_data(exp, "ZOMBIE", locks);
1557         spin_unlock(&obd_zombie_impexp_lock);
1558 }
1559 EXPORT_SYMBOL(dump_exports);
1560
1561 void obd_exports_barrier(struct obd_device *obd)
1562 {
1563         int waited = 2;
1564         LASSERT(cfs_list_empty(&obd->obd_exports));
1565         spin_lock(&obd->obd_dev_lock);
1566         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1567                 spin_unlock(&obd->obd_dev_lock);
1568                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1569                                                    cfs_time_seconds(waited));
1570                 if (waited > 5 && IS_PO2(waited)) {
1571                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1572                                       "more than %d seconds. "
1573                                       "The obd refcount = %d. Is it stuck?\n",
1574                                       obd->obd_name, waited,
1575                                       cfs_atomic_read(&obd->obd_refcount));
1576                         dump_exports(obd, 1);
1577                 }
1578                 waited *= 2;
1579                 spin_lock(&obd->obd_dev_lock);
1580         }
1581         spin_unlock(&obd->obd_dev_lock);
1582 }
1583 EXPORT_SYMBOL(obd_exports_barrier);
1584
1585 /* Total amount of zombies to be destroyed */
1586 static int zombies_count = 0;
1587
1588 /**
1589  * kill zombie imports and exports
1590  */
1591 void obd_zombie_impexp_cull(void)
1592 {
1593         struct obd_import *import;
1594         struct obd_export *export;
1595         ENTRY;
1596
1597         do {
1598                 spin_lock(&obd_zombie_impexp_lock);
1599
1600                 import = NULL;
1601                 if (!cfs_list_empty(&obd_zombie_imports)) {
1602                         import = cfs_list_entry(obd_zombie_imports.next,
1603                                                 struct obd_import,
1604                                                 imp_zombie_chain);
1605                         cfs_list_del_init(&import->imp_zombie_chain);
1606                 }
1607
1608                 export = NULL;
1609                 if (!cfs_list_empty(&obd_zombie_exports)) {
1610                         export = cfs_list_entry(obd_zombie_exports.next,
1611                                                 struct obd_export,
1612                                                 exp_obd_chain);
1613                         cfs_list_del_init(&export->exp_obd_chain);
1614                 }
1615
1616                 spin_unlock(&obd_zombie_impexp_lock);
1617
1618                 if (import != NULL) {
1619                         class_import_destroy(import);
1620                         spin_lock(&obd_zombie_impexp_lock);
1621                         zombies_count--;
1622                         spin_unlock(&obd_zombie_impexp_lock);
1623                 }
1624
1625                 if (export != NULL) {
1626                         class_export_destroy(export);
1627                         spin_lock(&obd_zombie_impexp_lock);
1628                         zombies_count--;
1629                         spin_unlock(&obd_zombie_impexp_lock);
1630                 }
1631
1632                 cfs_cond_resched();
1633         } while (import != NULL || export != NULL);
1634         EXIT;
1635 }
1636
1637 static struct completion        obd_zombie_start;
1638 static struct completion        obd_zombie_stop;
1639 static unsigned long            obd_zombie_flags;
1640 static cfs_waitq_t              obd_zombie_waitq;
1641 static pid_t                    obd_zombie_pid;
1642
1643 enum {
1644         OBD_ZOMBIE_STOP         = 0x0001,
1645 };
1646
1647 /**
1648  * check for work for kill zombie import/export thread.
1649  */
1650 static int obd_zombie_impexp_check(void *arg)
1651 {
1652         int rc;
1653
1654         spin_lock(&obd_zombie_impexp_lock);
1655         rc = (zombies_count == 0) &&
1656              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1657         spin_unlock(&obd_zombie_impexp_lock);
1658
1659         RETURN(rc);
1660 }
1661
1662 /**
1663  * Add export to the obd_zombe thread and notify it.
1664  */
1665 static void obd_zombie_export_add(struct obd_export *exp) {
1666         spin_lock(&exp->exp_obd->obd_dev_lock);
1667         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1668         cfs_list_del_init(&exp->exp_obd_chain);
1669         spin_unlock(&exp->exp_obd->obd_dev_lock);
1670         spin_lock(&obd_zombie_impexp_lock);
1671         zombies_count++;
1672         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1673         spin_unlock(&obd_zombie_impexp_lock);
1674
1675         obd_zombie_impexp_notify();
1676 }
1677
1678 /**
1679  * Add import to the obd_zombe thread and notify it.
1680  */
1681 static void obd_zombie_import_add(struct obd_import *imp) {
1682         LASSERT(imp->imp_sec == NULL);
1683         LASSERT(imp->imp_rq_pool == NULL);
1684         spin_lock(&obd_zombie_impexp_lock);
1685         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1686         zombies_count++;
1687         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1688         spin_unlock(&obd_zombie_impexp_lock);
1689
1690         obd_zombie_impexp_notify();
1691 }
1692
1693 /**
1694  * notify import/export destroy thread about new zombie.
1695  */
1696 static void obd_zombie_impexp_notify(void)
1697 {
1698         /*
1699          * Make sure obd_zomebie_impexp_thread get this notification.
1700          * It is possible this signal only get by obd_zombie_barrier, and
1701          * barrier gulps this notification and sleeps away and hangs ensues
1702          */
1703         cfs_waitq_broadcast(&obd_zombie_waitq);
1704 }
1705
1706 /**
1707  * check whether obd_zombie is idle
1708  */
1709 static int obd_zombie_is_idle(void)
1710 {
1711         int rc;
1712
1713         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1714         spin_lock(&obd_zombie_impexp_lock);
1715         rc = (zombies_count == 0);
1716         spin_unlock(&obd_zombie_impexp_lock);
1717         return rc;
1718 }
1719
1720 /**
1721  * wait when obd_zombie import/export queues become empty
1722  */
1723 void obd_zombie_barrier(void)
1724 {
1725         struct l_wait_info lwi = { 0 };
1726
1727         if (obd_zombie_pid == cfs_curproc_pid())
1728                 /* don't wait for myself */
1729                 return;
1730         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1731 }
1732 EXPORT_SYMBOL(obd_zombie_barrier);
1733
1734 #ifdef __KERNEL__
1735
1736 /**
1737  * destroy zombie export/import thread.
1738  */
1739 static int obd_zombie_impexp_thread(void *unused)
1740 {
1741         unshare_fs_struct();
1742         complete(&obd_zombie_start);
1743
1744         obd_zombie_pid = cfs_curproc_pid();
1745
1746         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1747                 struct l_wait_info lwi = { 0 };
1748
1749                 l_wait_event(obd_zombie_waitq,
1750                              !obd_zombie_impexp_check(NULL), &lwi);
1751                 obd_zombie_impexp_cull();
1752
1753                 /*
1754                  * Notify obd_zombie_barrier callers that queues
1755                  * may be empty.
1756                  */
1757                 cfs_waitq_signal(&obd_zombie_waitq);
1758         }
1759
1760         complete(&obd_zombie_stop);
1761
1762         RETURN(0);
1763 }
1764
1765 #else /* ! KERNEL */
1766
1767 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1768 static void *obd_zombie_impexp_work_cb;
1769 static void *obd_zombie_impexp_idle_cb;
1770
1771 int obd_zombie_impexp_kill(void *arg)
1772 {
1773         int rc = 0;
1774
1775         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1776                 obd_zombie_impexp_cull();
1777                 rc = 1;
1778         }
1779         cfs_atomic_dec(&zombie_recur);
1780         return rc;
1781 }
1782
1783 #endif
1784
1785 /**
1786  * start destroy zombie import/export thread
1787  */
1788 int obd_zombie_impexp_init(void)
1789 {
1790 #ifdef __KERNEL__
1791         cfs_task_t *task;
1792 #endif
1793
1794         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1795         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1796         spin_lock_init(&obd_zombie_impexp_lock);
1797         init_completion(&obd_zombie_start);
1798         init_completion(&obd_zombie_stop);
1799         cfs_waitq_init(&obd_zombie_waitq);
1800         obd_zombie_pid = 0;
1801
1802 #ifdef __KERNEL__
1803         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1804         if (IS_ERR(task))
1805                 RETURN(PTR_ERR(task));
1806
1807         wait_for_completion(&obd_zombie_start);
1808 #else
1809
1810         obd_zombie_impexp_work_cb =
1811                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1812                                                  &obd_zombie_impexp_kill, NULL);
1813
1814         obd_zombie_impexp_idle_cb =
1815                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1816                                                  &obd_zombie_impexp_check, NULL);
1817 #endif
1818         RETURN(0);
1819 }
1820 /**
1821  * stop destroy zombie import/export thread
1822  */
1823 void obd_zombie_impexp_stop(void)
1824 {
1825         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1826         obd_zombie_impexp_notify();
1827 #ifdef __KERNEL__
1828         wait_for_completion(&obd_zombie_stop);
1829 #else
1830         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1831         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1832 #endif
1833 }
1834
1835 /***** Kernel-userspace comm helpers *******/
1836
1837 /* Get length of entire message, including header */
1838 int kuc_len(int payload_len)
1839 {
1840         return sizeof(struct kuc_hdr) + payload_len;
1841 }
1842 EXPORT_SYMBOL(kuc_len);
1843
1844 /* Get a pointer to kuc header, given a ptr to the payload
1845  * @param p Pointer to payload area
1846  * @returns Pointer to kuc header
1847  */
1848 struct kuc_hdr * kuc_ptr(void *p)
1849 {
1850         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1851         LASSERT(lh->kuc_magic == KUC_MAGIC);
1852         return lh;
1853 }
1854 EXPORT_SYMBOL(kuc_ptr);
1855
1856 /* Test if payload is part of kuc message
1857  * @param p Pointer to payload area
1858  * @returns boolean
1859  */
1860 int kuc_ispayload(void *p)
1861 {
1862         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1863
1864         if (kh->kuc_magic == KUC_MAGIC)
1865                 return 1;
1866         else
1867                 return 0;
1868 }
1869 EXPORT_SYMBOL(kuc_ispayload);
1870
1871 /* Alloc space for a message, and fill in header
1872  * @return Pointer to payload area
1873  */
1874 void *kuc_alloc(int payload_len, int transport, int type)
1875 {
1876         struct kuc_hdr *lh;
1877         int len = kuc_len(payload_len);
1878
1879         OBD_ALLOC(lh, len);
1880         if (lh == NULL)
1881                 return ERR_PTR(-ENOMEM);
1882
1883         lh->kuc_magic = KUC_MAGIC;
1884         lh->kuc_transport = transport;
1885         lh->kuc_msgtype = type;
1886         lh->kuc_msglen = len;
1887
1888         return (void *)(lh + 1);
1889 }
1890 EXPORT_SYMBOL(kuc_alloc);
1891
1892 /* Takes pointer to payload area */
1893 inline void kuc_free(void *p, int payload_len)
1894 {
1895         struct kuc_hdr *lh = kuc_ptr(p);
1896         OBD_FREE(lh, kuc_len(payload_len));
1897 }
1898 EXPORT_SYMBOL(kuc_free);
1899
1900
1901