Whamcloud - gitweb
LU-793 ptlrpc: allow client to reconnect with RPC in progress
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151 EXPORT_SYMBOL(class_get_type);
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161 EXPORT_SYMBOL(class_put_type);
162
163 #define CLASS_MAX_NAME 1024
164
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166                         struct lprocfs_vars *vars, const char *name,
167                         struct lu_device_type *ldt)
168 {
169         struct obd_type *type;
170         int rc = 0;
171         ENTRY;
172
173         /* sanity check */
174         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
175
176         if (class_search_type(name)) {
177                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
178                 RETURN(-EEXIST);
179         }
180
181         rc = -ENOMEM;
182         OBD_ALLOC(type, sizeof(*type));
183         if (type == NULL)
184                 RETURN(rc);
185
186         OBD_ALLOC_PTR(type->typ_dt_ops);
187         OBD_ALLOC_PTR(type->typ_md_ops);
188         OBD_ALLOC(type->typ_name, strlen(name) + 1);
189
190         if (type->typ_dt_ops == NULL ||
191             type->typ_md_ops == NULL ||
192             type->typ_name == NULL)
193                 GOTO (failed, rc);
194
195         *(type->typ_dt_ops) = *dt_ops;
196         /* md_ops is optional */
197         if (md_ops)
198                 *(type->typ_md_ops) = *md_ops;
199         strcpy(type->typ_name, name);
200         spin_lock_init(&type->obd_type_lock);
201
202 #ifdef LPROCFS
203         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
204                                               vars, type);
205         if (IS_ERR(type->typ_procroot)) {
206                 rc = PTR_ERR(type->typ_procroot);
207                 type->typ_procroot = NULL;
208                 GOTO (failed, rc);
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         cfs_list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224  failed:
225         if (type->typ_name != NULL)
226                 OBD_FREE(type->typ_name, strlen(name) + 1);
227         if (type->typ_md_ops != NULL)
228                 OBD_FREE_PTR(type->typ_md_ops);
229         if (type->typ_dt_ops != NULL)
230                 OBD_FREE_PTR(type->typ_dt_ops);
231         OBD_FREE(type, sizeof(*type));
232         RETURN(rc);
233 }
234 EXPORT_SYMBOL(class_register_type);
235
236 int class_unregister_type(const char *name)
237 {
238         struct obd_type *type = class_search_type(name);
239         ENTRY;
240
241         if (!type) {
242                 CERROR("unknown obd type\n");
243                 RETURN(-EINVAL);
244         }
245
246         if (type->typ_refcnt) {
247                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248                 /* This is a bad situation, let's make the best of it */
249                 /* Remove ops, but leave the name for debugging */
250                 OBD_FREE_PTR(type->typ_dt_ops);
251                 OBD_FREE_PTR(type->typ_md_ops);
252                 RETURN(-EBUSY);
253         }
254
255         /* we do not use type->typ_procroot as for compatibility purposes
256          * other modules can share names (i.e. lod can use lov entry). so
257          * we can't reference pointer as it can get invalided when another
258          * module removes the entry */
259         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
260
261         if (type->typ_lu)
262                 lu_device_type_fini(type->typ_lu);
263
264         spin_lock(&obd_types_lock);
265         cfs_list_del(&type->typ_chain);
266         spin_unlock(&obd_types_lock);
267         OBD_FREE(type->typ_name, strlen(name) + 1);
268         if (type->typ_dt_ops != NULL)
269                 OBD_FREE_PTR(type->typ_dt_ops);
270         if (type->typ_md_ops != NULL)
271                 OBD_FREE_PTR(type->typ_md_ops);
272         OBD_FREE(type, sizeof(*type));
273         RETURN(0);
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
276
277 /**
278  * Create a new obd device.
279  *
280  * Find an empty slot in ::obd_devs[], create a new obd device in it.
281  *
282  * \param[in] type_name obd device type string.
283  * \param[in] name      obd device name.
284  *
285  * \retval NULL if create fails, otherwise return the obd device
286  *         pointer created.
287  */
288 struct obd_device *class_newdev(const char *type_name, const char *name)
289 {
290         struct obd_device *result = NULL;
291         struct obd_device *newdev;
292         struct obd_type *type = NULL;
293         int i;
294         int new_obd_minor = 0;
295         ENTRY;
296
297         if (strlen(name) >= MAX_OBD_NAME) {
298                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299                 RETURN(ERR_PTR(-EINVAL));
300         }
301
302         type = class_get_type(type_name);
303         if (type == NULL){
304                 CERROR("OBD: unknown type: %s\n", type_name);
305                 RETURN(ERR_PTR(-ENODEV));
306         }
307
308         newdev = obd_device_alloc();
309         if (newdev == NULL)
310                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
311
312         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
313
314         write_lock(&obd_dev_lock);
315         for (i = 0; i < class_devno_max(); i++) {
316                 struct obd_device *obd = class_num2obd(i);
317
318                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
319                         CERROR("Device %s already exists at %d, won't add\n",
320                                name, i);
321                         if (result) {
322                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
323                                          "%p obd_magic %08x != %08x\n", result,
324                                          result->obd_magic, OBD_DEVICE_MAGIC);
325                                 LASSERTF(result->obd_minor == new_obd_minor,
326                                          "%p obd_minor %d != %d\n", result,
327                                          result->obd_minor, new_obd_minor);
328
329                                 obd_devs[result->obd_minor] = NULL;
330                                 result->obd_name[0]='\0';
331                          }
332                         result = ERR_PTR(-EEXIST);
333                         break;
334                 }
335                 if (!result && !obd) {
336                         result = newdev;
337                         result->obd_minor = i;
338                         new_obd_minor = i;
339                         result->obd_type = type;
340                         strncpy(result->obd_name, name,
341                                 sizeof(result->obd_name) - 1);
342                         obd_devs[i] = result;
343                 }
344         }
345         write_unlock(&obd_dev_lock);
346
347         if (result == NULL && i >= class_devno_max()) {
348                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
349                        class_devno_max());
350                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
351         }
352
353         if (IS_ERR(result))
354                 GOTO(out, result);
355
356         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
357                result->obd_name, result);
358
359         RETURN(result);
360 out:
361         obd_device_free(newdev);
362 out_type:
363         class_put_type(type);
364         return result;
365 }
366
367 void class_release_dev(struct obd_device *obd)
368 {
369         struct obd_type *obd_type = obd->obd_type;
370
371         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
372                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
373         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
374                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
375         LASSERT(obd_type != NULL);
376
377         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
378                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
379
380         write_lock(&obd_dev_lock);
381         obd_devs[obd->obd_minor] = NULL;
382         write_unlock(&obd_dev_lock);
383         obd_device_free(obd);
384
385         class_put_type(obd_type);
386 }
387
388 int class_name2dev(const char *name)
389 {
390         int i;
391
392         if (!name)
393                 return -1;
394
395         read_lock(&obd_dev_lock);
396         for (i = 0; i < class_devno_max(); i++) {
397                 struct obd_device *obd = class_num2obd(i);
398
399                 if (obd && strcmp(name, obd->obd_name) == 0) {
400                         /* Make sure we finished attaching before we give
401                            out any references */
402                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
403                         if (obd->obd_attached) {
404                                 read_unlock(&obd_dev_lock);
405                                 return i;
406                         }
407                         break;
408                 }
409         }
410         read_unlock(&obd_dev_lock);
411
412         return -1;
413 }
414 EXPORT_SYMBOL(class_name2dev);
415
416 struct obd_device *class_name2obd(const char *name)
417 {
418         int dev = class_name2dev(name);
419
420         if (dev < 0 || dev > class_devno_max())
421                 return NULL;
422         return class_num2obd(dev);
423 }
424 EXPORT_SYMBOL(class_name2obd);
425
426 int class_uuid2dev(struct obd_uuid *uuid)
427 {
428         int i;
429
430         read_lock(&obd_dev_lock);
431         for (i = 0; i < class_devno_max(); i++) {
432                 struct obd_device *obd = class_num2obd(i);
433
434                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
435                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
436                         read_unlock(&obd_dev_lock);
437                         return i;
438                 }
439         }
440         read_unlock(&obd_dev_lock);
441
442         return -1;
443 }
444 EXPORT_SYMBOL(class_uuid2dev);
445
446 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
447 {
448         int dev = class_uuid2dev(uuid);
449         if (dev < 0)
450                 return NULL;
451         return class_num2obd(dev);
452 }
453 EXPORT_SYMBOL(class_uuid2obd);
454
455 /**
456  * Get obd device from ::obd_devs[]
457  *
458  * \param num [in] array index
459  *
460  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
461  *         otherwise return the obd device there.
462  */
463 struct obd_device *class_num2obd(int num)
464 {
465         struct obd_device *obd = NULL;
466
467         if (num < class_devno_max()) {
468                 obd = obd_devs[num];
469                 if (obd == NULL)
470                         return NULL;
471
472                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
473                          "%p obd_magic %08x != %08x\n",
474                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475                 LASSERTF(obd->obd_minor == num,
476                          "%p obd_minor %0d != %0d\n",
477                          obd, obd->obd_minor, num);
478         }
479
480         return obd;
481 }
482 EXPORT_SYMBOL(class_num2obd);
483
484 /**
485  * Get obd devices count. Device in any
486  *    state are counted
487  * \retval obd device count
488  */
489 int get_devices_count(void)
490 {
491         int index, max_index = class_devno_max(), dev_count = 0;
492
493         read_lock(&obd_dev_lock);
494         for (index = 0; index <= max_index; index++) {
495                 struct obd_device *obd = class_num2obd(index);
496                 if (obd != NULL)
497                         dev_count++;
498         }
499         read_unlock(&obd_dev_lock);
500
501         return dev_count;
502 }
503 EXPORT_SYMBOL(get_devices_count);
504
505 void class_obd_list(void)
506 {
507         char *status;
508         int i;
509
510         read_lock(&obd_dev_lock);
511         for (i = 0; i < class_devno_max(); i++) {
512                 struct obd_device *obd = class_num2obd(i);
513
514                 if (obd == NULL)
515                         continue;
516                 if (obd->obd_stopping)
517                         status = "ST";
518                 else if (obd->obd_set_up)
519                         status = "UP";
520                 else if (obd->obd_attached)
521                         status = "AT";
522                 else
523                         status = "--";
524                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
525                          i, status, obd->obd_type->typ_name,
526                          obd->obd_name, obd->obd_uuid.uuid,
527                          cfs_atomic_read(&obd->obd_refcount));
528         }
529         read_unlock(&obd_dev_lock);
530         return;
531 }
532
533 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
534    specified, then only the client with that uuid is returned,
535    otherwise any client connected to the tgt is returned. */
536 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
537                                           const char * typ_name,
538                                           struct obd_uuid *grp_uuid)
539 {
540         int i;
541
542         read_lock(&obd_dev_lock);
543         for (i = 0; i < class_devno_max(); i++) {
544                 struct obd_device *obd = class_num2obd(i);
545
546                 if (obd == NULL)
547                         continue;
548                 if ((strncmp(obd->obd_type->typ_name, typ_name,
549                              strlen(typ_name)) == 0)) {
550                         if (obd_uuid_equals(tgt_uuid,
551                                             &obd->u.cli.cl_target_uuid) &&
552                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
553                                                          &obd->obd_uuid) : 1)) {
554                                 read_unlock(&obd_dev_lock);
555                                 return obd;
556                         }
557                 }
558         }
559         read_unlock(&obd_dev_lock);
560
561         return NULL;
562 }
563 EXPORT_SYMBOL(class_find_client_obd);
564
565 /* Iterate the obd_device list looking devices have grp_uuid. Start
566    searching at *next, and if a device is found, the next index to look
567    at is saved in *next. If next is NULL, then the first matching device
568    will always be returned. */
569 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
570 {
571         int i;
572
573         if (next == NULL)
574                 i = 0;
575         else if (*next >= 0 && *next < class_devno_max())
576                 i = *next;
577         else
578                 return NULL;
579
580         read_lock(&obd_dev_lock);
581         for (; i < class_devno_max(); i++) {
582                 struct obd_device *obd = class_num2obd(i);
583
584                 if (obd == NULL)
585                         continue;
586                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
587                         if (next != NULL)
588                                 *next = i+1;
589                         read_unlock(&obd_dev_lock);
590                         return obd;
591                 }
592         }
593         read_unlock(&obd_dev_lock);
594
595         return NULL;
596 }
597 EXPORT_SYMBOL(class_devices_in_group);
598
599 /**
600  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
601  * adjust sptlrpc settings accordingly.
602  */
603 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
604 {
605         struct obd_device  *obd;
606         const char         *type;
607         int                 i, rc = 0, rc2;
608
609         LASSERT(namelen > 0);
610
611         read_lock(&obd_dev_lock);
612         for (i = 0; i < class_devno_max(); i++) {
613                 obd = class_num2obd(i);
614
615                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
616                         continue;
617
618                 /* only notify mdc, osc, mdt, ost */
619                 type = obd->obd_type->typ_name;
620                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
621                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
622                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
623                     strcmp(type, LUSTRE_OST_NAME) != 0)
624                         continue;
625
626                 if (strncmp(obd->obd_name, fsname, namelen))
627                         continue;
628
629                 class_incref(obd, __FUNCTION__, obd);
630                 read_unlock(&obd_dev_lock);
631                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
632                                          sizeof(KEY_SPTLRPC_CONF),
633                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
634                 rc = rc ? rc : rc2;
635                 class_decref(obd, __FUNCTION__, obd);
636                 read_lock(&obd_dev_lock);
637         }
638         read_unlock(&obd_dev_lock);
639         return rc;
640 }
641 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
642
643 void obd_cleanup_caches(void)
644 {
645         ENTRY;
646         if (obd_device_cachep) {
647                 kmem_cache_destroy(obd_device_cachep);
648                 obd_device_cachep = NULL;
649         }
650         if (obdo_cachep) {
651                 kmem_cache_destroy(obdo_cachep);
652                 obdo_cachep = NULL;
653         }
654         if (import_cachep) {
655                 kmem_cache_destroy(import_cachep);
656                 import_cachep = NULL;
657         }
658         if (capa_cachep) {
659                 kmem_cache_destroy(capa_cachep);
660                 capa_cachep = NULL;
661         }
662         EXIT;
663 }
664
665 int obd_init_caches(void)
666 {
667         ENTRY;
668
669         LASSERT(obd_device_cachep == NULL);
670         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
671                                               sizeof(struct obd_device),
672                                               0, 0, NULL);
673         if (!obd_device_cachep)
674                 GOTO(out, -ENOMEM);
675
676         LASSERT(obdo_cachep == NULL);
677         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
678                                         0, 0, NULL);
679         if (!obdo_cachep)
680                 GOTO(out, -ENOMEM);
681
682         LASSERT(import_cachep == NULL);
683         import_cachep = kmem_cache_create("ll_import_cache",
684                                           sizeof(struct obd_import),
685                                           0, 0, NULL);
686         if (!import_cachep)
687                 GOTO(out, -ENOMEM);
688
689         LASSERT(capa_cachep == NULL);
690         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
691                                         0, 0, NULL);
692         if (!capa_cachep)
693                 GOTO(out, -ENOMEM);
694
695         RETURN(0);
696 out:
697         obd_cleanup_caches();
698         RETURN(-ENOMEM);
699
700 }
701
702 /* map connection to client */
703 struct obd_export *class_conn2export(struct lustre_handle *conn)
704 {
705         struct obd_export *export;
706         ENTRY;
707
708         if (!conn) {
709                 CDEBUG(D_CACHE, "looking for null handle\n");
710                 RETURN(NULL);
711         }
712
713         if (conn->cookie == -1) {  /* this means assign a new connection */
714                 CDEBUG(D_CACHE, "want a new connection\n");
715                 RETURN(NULL);
716         }
717
718         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
719         export = class_handle2object(conn->cookie, NULL);
720         RETURN(export);
721 }
722 EXPORT_SYMBOL(class_conn2export);
723
724 struct obd_device *class_exp2obd(struct obd_export *exp)
725 {
726         if (exp)
727                 return exp->exp_obd;
728         return NULL;
729 }
730 EXPORT_SYMBOL(class_exp2obd);
731
732 struct obd_device *class_conn2obd(struct lustre_handle *conn)
733 {
734         struct obd_export *export;
735         export = class_conn2export(conn);
736         if (export) {
737                 struct obd_device *obd = export->exp_obd;
738                 class_export_put(export);
739                 return obd;
740         }
741         return NULL;
742 }
743 EXPORT_SYMBOL(class_conn2obd);
744
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
746 {
747         struct obd_device *obd = exp->exp_obd;
748         if (obd == NULL)
749                 return NULL;
750         return obd->u.cli.cl_import;
751 }
752 EXPORT_SYMBOL(class_exp2cliimp);
753
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
755 {
756         struct obd_device *obd = class_conn2obd(conn);
757         if (obd == NULL)
758                 return NULL;
759         return obd->u.cli.cl_import;
760 }
761 EXPORT_SYMBOL(class_conn2cliimp);
762
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
765 {
766         struct obd_device *obd = exp->exp_obd;
767         ENTRY;
768
769         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770         LASSERT(obd != NULL);
771
772         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773                exp->exp_client_uuid.uuid, obd->obd_name);
774
775         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776         if (exp->exp_connection)
777                 ptlrpc_put_connection_superhack(exp->exp_connection);
778
779         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
780         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
781         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
782         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
783         obd_destroy_export(exp);
784         class_decref(obd, "export", exp);
785
786         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
787         EXIT;
788 }
789
790 static void export_handle_addref(void *export)
791 {
792         class_export_get(export);
793 }
794
795 static struct portals_handle_ops export_handle_ops = {
796         .hop_addref = export_handle_addref,
797         .hop_free   = NULL,
798 };
799
800 struct obd_export *class_export_get(struct obd_export *exp)
801 {
802         cfs_atomic_inc(&exp->exp_refcount);
803         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804                cfs_atomic_read(&exp->exp_refcount));
805         return exp;
806 }
807 EXPORT_SYMBOL(class_export_get);
808
809 void class_export_put(struct obd_export *exp)
810 {
811         LASSERT(exp != NULL);
812         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814                cfs_atomic_read(&exp->exp_refcount) - 1);
815
816         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
817                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
818                 CDEBUG(D_IOCTL, "final put %p/%s\n",
819                        exp, exp->exp_client_uuid.uuid);
820
821                 /* release nid stat refererence */
822                 lprocfs_exp_cleanup(exp);
823
824                 obd_zombie_export_add(exp);
825         }
826 }
827 EXPORT_SYMBOL(class_export_put);
828
829 /* Creates a new export, adds it to the hash table, and returns a
830  * pointer to it. The refcount is 2: one for the hash reference, and
831  * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833                                     struct obd_uuid *cluuid)
834 {
835         struct obd_export *export;
836         cfs_hash_t *hash = NULL;
837         int rc = 0;
838         ENTRY;
839
840         OBD_ALLOC_PTR(export);
841         if (!export)
842                 return ERR_PTR(-ENOMEM);
843
844         export->exp_conn_cnt = 0;
845         export->exp_lock_hash = NULL;
846         export->exp_flock_hash = NULL;
847         cfs_atomic_set(&export->exp_refcount, 2);
848         cfs_atomic_set(&export->exp_rpc_count, 0);
849         cfs_atomic_set(&export->exp_cb_count, 0);
850         cfs_atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
853         spin_lock_init(&export->exp_locks_list_guard);
854 #endif
855         cfs_atomic_set(&export->exp_replay_count, 0);
856         export->exp_obd = obd;
857         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
858         spin_lock_init(&export->exp_uncommitted_replies_lock);
859         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
861         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
862         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
863         CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
864         class_handle_hash(&export->exp_handle, &export_handle_ops);
865         export->exp_last_request_time = cfs_time_current_sec();
866         spin_lock_init(&export->exp_lock);
867         spin_lock_init(&export->exp_rpc_lock);
868         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
869         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
870         spin_lock_init(&export->exp_bl_list_lock);
871         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
872
873         export->exp_sp_peer = LUSTRE_SP_ANY;
874         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
875         export->exp_client_uuid = *cluuid;
876         obd_init_export(export);
877
878         spin_lock(&obd->obd_dev_lock);
879         /* shouldn't happen, but might race */
880         if (obd->obd_stopping)
881                 GOTO(exit_unlock, rc = -ENODEV);
882
883         hash = cfs_hash_getref(obd->obd_uuid_hash);
884         if (hash == NULL)
885                 GOTO(exit_unlock, rc = -ENODEV);
886         spin_unlock(&obd->obd_dev_lock);
887
888         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
889                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
890                 if (rc != 0) {
891                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
892                                       obd->obd_name, cluuid->uuid, rc);
893                         GOTO(exit_err, rc = -EALREADY);
894                 }
895         }
896
897         spin_lock(&obd->obd_dev_lock);
898         if (obd->obd_stopping) {
899                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
900                 GOTO(exit_unlock, rc = -ENODEV);
901         }
902
903         class_incref(obd, "export", export);
904         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
905         cfs_list_add_tail(&export->exp_obd_chain_timed,
906                           &export->exp_obd->obd_exports_timed);
907         export->exp_obd->obd_num_exports++;
908         spin_unlock(&obd->obd_dev_lock);
909         cfs_hash_putref(hash);
910         RETURN(export);
911
912 exit_unlock:
913         spin_unlock(&obd->obd_dev_lock);
914 exit_err:
915         if (hash)
916                 cfs_hash_putref(hash);
917         class_handle_unhash(&export->exp_handle);
918         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
919         obd_destroy_export(export);
920         OBD_FREE_PTR(export);
921         return ERR_PTR(rc);
922 }
923 EXPORT_SYMBOL(class_new_export);
924
925 void class_unlink_export(struct obd_export *exp)
926 {
927         class_handle_unhash(&exp->exp_handle);
928
929         spin_lock(&exp->exp_obd->obd_dev_lock);
930         /* delete an uuid-export hashitem from hashtables */
931         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
932                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
933                              &exp->exp_client_uuid,
934                              &exp->exp_uuid_hash);
935
936         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
937         cfs_list_del_init(&exp->exp_obd_chain_timed);
938         exp->exp_obd->obd_num_exports--;
939         spin_unlock(&exp->exp_obd->obd_dev_lock);
940         class_export_put(exp);
941 }
942 EXPORT_SYMBOL(class_unlink_export);
943
944 /* Import management functions */
945 void class_import_destroy(struct obd_import *imp)
946 {
947         ENTRY;
948
949         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
950                 imp->imp_obd->obd_name);
951
952         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
953
954         ptlrpc_put_connection_superhack(imp->imp_connection);
955
956         while (!cfs_list_empty(&imp->imp_conn_list)) {
957                 struct obd_import_conn *imp_conn;
958
959                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
960                                           struct obd_import_conn, oic_item);
961                 cfs_list_del_init(&imp_conn->oic_item);
962                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
963                 OBD_FREE(imp_conn, sizeof(*imp_conn));
964         }
965
966         LASSERT(imp->imp_sec == NULL);
967         class_decref(imp->imp_obd, "import", imp);
968         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
969         EXIT;
970 }
971
972 static void import_handle_addref(void *import)
973 {
974         class_import_get(import);
975 }
976
977 static struct portals_handle_ops import_handle_ops = {
978         .hop_addref = import_handle_addref,
979         .hop_free   = NULL,
980 };
981
982 struct obd_import *class_import_get(struct obd_import *import)
983 {
984         cfs_atomic_inc(&import->imp_refcount);
985         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
986                cfs_atomic_read(&import->imp_refcount),
987                import->imp_obd->obd_name);
988         return import;
989 }
990 EXPORT_SYMBOL(class_import_get);
991
992 void class_import_put(struct obd_import *imp)
993 {
994         ENTRY;
995
996         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
997         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
998
999         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1000                cfs_atomic_read(&imp->imp_refcount) - 1,
1001                imp->imp_obd->obd_name);
1002
1003         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1004                 CDEBUG(D_INFO, "final put import %p\n", imp);
1005                 obd_zombie_import_add(imp);
1006         }
1007
1008         /* catch possible import put race */
1009         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1010         EXIT;
1011 }
1012 EXPORT_SYMBOL(class_import_put);
1013
1014 static void init_imp_at(struct imp_at *at) {
1015         int i;
1016         at_init(&at->iat_net_latency, 0, 0);
1017         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1018                 /* max service estimates are tracked on the server side, so
1019                    don't use the AT history here, just use the last reported
1020                    val. (But keep hist for proc histogram, worst_ever) */
1021                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1022                         AT_FLG_NOHIST);
1023         }
1024 }
1025
1026 struct obd_import *class_new_import(struct obd_device *obd)
1027 {
1028         struct obd_import *imp;
1029
1030         OBD_ALLOC(imp, sizeof(*imp));
1031         if (imp == NULL)
1032                 return NULL;
1033
1034         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1035         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1036         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1037         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1038         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1039         CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1040         imp->imp_replay_cursor = &imp->imp_committed_list;
1041         spin_lock_init(&imp->imp_lock);
1042         imp->imp_last_success_conn = 0;
1043         imp->imp_state = LUSTRE_IMP_NEW;
1044         imp->imp_obd = class_incref(obd, "import", imp);
1045         mutex_init(&imp->imp_sec_mutex);
1046         init_waitqueue_head(&imp->imp_recovery_waitq);
1047
1048         cfs_atomic_set(&imp->imp_refcount, 2);
1049         cfs_atomic_set(&imp->imp_unregistering, 0);
1050         cfs_atomic_set(&imp->imp_inflight, 0);
1051         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1052         cfs_atomic_set(&imp->imp_inval_count, 0);
1053         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1054         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1055         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1056         init_imp_at(&imp->imp_at);
1057
1058         /* the default magic is V2, will be used in connect RPC, and
1059          * then adjusted according to the flags in request/reply. */
1060         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1061
1062         return imp;
1063 }
1064 EXPORT_SYMBOL(class_new_import);
1065
1066 void class_destroy_import(struct obd_import *import)
1067 {
1068         LASSERT(import != NULL);
1069         LASSERT(import != LP_POISON);
1070
1071         class_handle_unhash(&import->imp_handle);
1072
1073         spin_lock(&import->imp_lock);
1074         import->imp_generation++;
1075         spin_unlock(&import->imp_lock);
1076         class_import_put(import);
1077 }
1078 EXPORT_SYMBOL(class_destroy_import);
1079
1080 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1081
1082 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1083 {
1084         spin_lock(&exp->exp_locks_list_guard);
1085
1086         LASSERT(lock->l_exp_refs_nr >= 0);
1087
1088         if (lock->l_exp_refs_target != NULL &&
1089             lock->l_exp_refs_target != exp) {
1090                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1091                               exp, lock, lock->l_exp_refs_target);
1092         }
1093         if ((lock->l_exp_refs_nr ++) == 0) {
1094                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1095                 lock->l_exp_refs_target = exp;
1096         }
1097         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098                lock, exp, lock->l_exp_refs_nr);
1099         spin_unlock(&exp->exp_locks_list_guard);
1100 }
1101 EXPORT_SYMBOL(__class_export_add_lock_ref);
1102
1103 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1104 {
1105         spin_lock(&exp->exp_locks_list_guard);
1106         LASSERT(lock->l_exp_refs_nr > 0);
1107         if (lock->l_exp_refs_target != exp) {
1108                 LCONSOLE_WARN("lock %p, "
1109                               "mismatching export pointers: %p, %p\n",
1110                               lock, lock->l_exp_refs_target, exp);
1111         }
1112         if (-- lock->l_exp_refs_nr == 0) {
1113                 cfs_list_del_init(&lock->l_exp_refs_link);
1114                 lock->l_exp_refs_target = NULL;
1115         }
1116         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117                lock, exp, lock->l_exp_refs_nr);
1118         spin_unlock(&exp->exp_locks_list_guard);
1119 }
1120 EXPORT_SYMBOL(__class_export_del_lock_ref);
1121 #endif
1122
1123 /* A connection defines an export context in which preallocation can
1124    be managed. This releases the export pointer reference, and returns
1125    the export handle, so the export refcount is 1 when this function
1126    returns. */
1127 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1128                   struct obd_uuid *cluuid)
1129 {
1130         struct obd_export *export;
1131         LASSERT(conn != NULL);
1132         LASSERT(obd != NULL);
1133         LASSERT(cluuid != NULL);
1134         ENTRY;
1135
1136         export = class_new_export(obd, cluuid);
1137         if (IS_ERR(export))
1138                 RETURN(PTR_ERR(export));
1139
1140         conn->cookie = export->exp_handle.h_cookie;
1141         class_export_put(export);
1142
1143         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1144                cluuid->uuid, conn->cookie);
1145         RETURN(0);
1146 }
1147 EXPORT_SYMBOL(class_connect);
1148
1149 /* if export is involved in recovery then clean up related things */
1150 void class_export_recovery_cleanup(struct obd_export *exp)
1151 {
1152         struct obd_device *obd = exp->exp_obd;
1153
1154         spin_lock(&obd->obd_recovery_task_lock);
1155         if (exp->exp_delayed)
1156                 obd->obd_delayed_clients--;
1157         if (obd->obd_recovering) {
1158                 if (exp->exp_in_recovery) {
1159                         spin_lock(&exp->exp_lock);
1160                         exp->exp_in_recovery = 0;
1161                         spin_unlock(&exp->exp_lock);
1162                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1163                         cfs_atomic_dec(&obd->obd_connected_clients);
1164                 }
1165
1166                 /* if called during recovery then should update
1167                  * obd_stale_clients counter,
1168                  * lightweight exports are not counted */
1169                 if (exp->exp_failed &&
1170                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1171                         exp->exp_obd->obd_stale_clients++;
1172         }
1173         spin_unlock(&obd->obd_recovery_task_lock);
1174         /** Cleanup req replay fields */
1175         if (exp->exp_req_replay_needed) {
1176                 spin_lock(&exp->exp_lock);
1177                 exp->exp_req_replay_needed = 0;
1178                 spin_unlock(&exp->exp_lock);
1179                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1180                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1181         }
1182         /** Cleanup lock replay data */
1183         if (exp->exp_lock_replay_needed) {
1184                 spin_lock(&exp->exp_lock);
1185                 exp->exp_lock_replay_needed = 0;
1186                 spin_unlock(&exp->exp_lock);
1187                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1188                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1189         }
1190 }
1191
1192 /* This function removes 1-3 references from the export:
1193  * 1 - for export pointer passed
1194  * and if disconnect really need
1195  * 2 - removing from hash
1196  * 3 - in client_unlink_export
1197  * The export pointer passed to this function can destroyed */
1198 int class_disconnect(struct obd_export *export)
1199 {
1200         int already_disconnected;
1201         ENTRY;
1202
1203         if (export == NULL) {
1204                 CWARN("attempting to free NULL export %p\n", export);
1205                 RETURN(-EINVAL);
1206         }
1207
1208         spin_lock(&export->exp_lock);
1209         already_disconnected = export->exp_disconnected;
1210         export->exp_disconnected = 1;
1211         spin_unlock(&export->exp_lock);
1212
1213         /* class_cleanup(), abort_recovery(), and class_fail_export()
1214          * all end up in here, and if any of them race we shouldn't
1215          * call extra class_export_puts(). */
1216         if (already_disconnected) {
1217                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1218                 GOTO(no_disconn, already_disconnected);
1219         }
1220
1221         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1222                export->exp_handle.h_cookie);
1223
1224         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1225                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1226                              &export->exp_connection->c_peer.nid,
1227                              &export->exp_nid_hash);
1228
1229         class_export_recovery_cleanup(export);
1230         class_unlink_export(export);
1231 no_disconn:
1232         class_export_put(export);
1233         RETURN(0);
1234 }
1235 EXPORT_SYMBOL(class_disconnect);
1236
1237 /* Return non-zero for a fully connected export */
1238 int class_connected_export(struct obd_export *exp)
1239 {
1240         int connected = 0;
1241
1242         if (exp) {
1243                 spin_lock(&exp->exp_lock);
1244                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1245                 spin_unlock(&exp->exp_lock);
1246         }
1247         return connected;
1248 }
1249 EXPORT_SYMBOL(class_connected_export);
1250
1251 static void class_disconnect_export_list(cfs_list_t *list,
1252                                          enum obd_option flags)
1253 {
1254         int rc;
1255         struct obd_export *exp;
1256         ENTRY;
1257
1258         /* It's possible that an export may disconnect itself, but
1259          * nothing else will be added to this list. */
1260         while (!cfs_list_empty(list)) {
1261                 exp = cfs_list_entry(list->next, struct obd_export,
1262                                      exp_obd_chain);
1263                 /* need for safe call CDEBUG after obd_disconnect */
1264                 class_export_get(exp);
1265
1266                 spin_lock(&exp->exp_lock);
1267                 exp->exp_flags = flags;
1268                 spin_unlock(&exp->exp_lock);
1269
1270                 if (obd_uuid_equals(&exp->exp_client_uuid,
1271                                     &exp->exp_obd->obd_uuid)) {
1272                         CDEBUG(D_HA,
1273                                "exp %p export uuid == obd uuid, don't discon\n",
1274                                exp);
1275                         /* Need to delete this now so we don't end up pointing
1276                          * to work_list later when this export is cleaned up. */
1277                         cfs_list_del_init(&exp->exp_obd_chain);
1278                         class_export_put(exp);
1279                         continue;
1280                 }
1281
1282                 class_export_get(exp);
1283                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1284                        "last request at "CFS_TIME_T"\n",
1285                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1286                        exp, exp->exp_last_request_time);
1287                 /* release one export reference anyway */
1288                 rc = obd_disconnect(exp);
1289
1290                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1291                        obd_export_nid2str(exp), exp, rc);
1292                 class_export_put(exp);
1293         }
1294         EXIT;
1295 }
1296
1297 void class_disconnect_exports(struct obd_device *obd)
1298 {
1299         cfs_list_t work_list;
1300         ENTRY;
1301
1302         /* Move all of the exports from obd_exports to a work list, en masse. */
1303         CFS_INIT_LIST_HEAD(&work_list);
1304         spin_lock(&obd->obd_dev_lock);
1305         cfs_list_splice_init(&obd->obd_exports, &work_list);
1306         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1307         spin_unlock(&obd->obd_dev_lock);
1308
1309         if (!cfs_list_empty(&work_list)) {
1310                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1311                        "disconnecting them\n", obd->obd_minor, obd);
1312                 class_disconnect_export_list(&work_list,
1313                                              exp_flags_from_obd(obd));
1314         } else
1315                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1316                        obd->obd_minor, obd);
1317         EXIT;
1318 }
1319 EXPORT_SYMBOL(class_disconnect_exports);
1320
1321 /* Remove exports that have not completed recovery.
1322  */
1323 void class_disconnect_stale_exports(struct obd_device *obd,
1324                                     int (*test_export)(struct obd_export *))
1325 {
1326         cfs_list_t work_list;
1327         struct obd_export *exp, *n;
1328         int evicted = 0;
1329         ENTRY;
1330
1331         CFS_INIT_LIST_HEAD(&work_list);
1332         spin_lock(&obd->obd_dev_lock);
1333         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1334                                      exp_obd_chain) {
1335                 /* don't count self-export as client */
1336                 if (obd_uuid_equals(&exp->exp_client_uuid,
1337                                     &exp->exp_obd->obd_uuid))
1338                         continue;
1339
1340                 /* don't evict clients which have no slot in last_rcvd
1341                  * (e.g. lightweight connection) */
1342                 if (exp->exp_target_data.ted_lr_idx == -1)
1343                         continue;
1344
1345                 spin_lock(&exp->exp_lock);
1346                 if (exp->exp_failed || test_export(exp)) {
1347                         spin_unlock(&exp->exp_lock);
1348                         continue;
1349                 }
1350                 exp->exp_failed = 1;
1351                 spin_unlock(&exp->exp_lock);
1352
1353                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1354                 evicted++;
1355                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1356                        obd->obd_name, exp->exp_client_uuid.uuid,
1357                        exp->exp_connection == NULL ? "<unknown>" :
1358                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1359                 print_export_data(exp, "EVICTING", 0);
1360         }
1361         spin_unlock(&obd->obd_dev_lock);
1362
1363         if (evicted)
1364                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1365                               obd->obd_name, evicted);
1366
1367         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1368                                                  OBD_OPT_ABORT_RECOV);
1369         EXIT;
1370 }
1371 EXPORT_SYMBOL(class_disconnect_stale_exports);
1372
1373 void class_fail_export(struct obd_export *exp)
1374 {
1375         int rc, already_failed;
1376
1377         spin_lock(&exp->exp_lock);
1378         already_failed = exp->exp_failed;
1379         exp->exp_failed = 1;
1380         spin_unlock(&exp->exp_lock);
1381
1382         if (already_failed) {
1383                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1384                        exp, exp->exp_client_uuid.uuid);
1385                 return;
1386         }
1387
1388         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1389                exp, exp->exp_client_uuid.uuid);
1390
1391         if (obd_dump_on_timeout)
1392                 libcfs_debug_dumplog();
1393
1394         /* need for safe call CDEBUG after obd_disconnect */
1395         class_export_get(exp);
1396
1397         /* Most callers into obd_disconnect are removing their own reference
1398          * (request, for example) in addition to the one from the hash table.
1399          * We don't have such a reference here, so make one. */
1400         class_export_get(exp);
1401         rc = obd_disconnect(exp);
1402         if (rc)
1403                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1404         else
1405                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1406                        exp, exp->exp_client_uuid.uuid);
1407         class_export_put(exp);
1408 }
1409 EXPORT_SYMBOL(class_fail_export);
1410
1411 char *obd_export_nid2str(struct obd_export *exp)
1412 {
1413         if (exp->exp_connection != NULL)
1414                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1415
1416         return "(no nid)";
1417 }
1418 EXPORT_SYMBOL(obd_export_nid2str);
1419
1420 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1421 {
1422         cfs_hash_t *nid_hash;
1423         struct obd_export *doomed_exp = NULL;
1424         int exports_evicted = 0;
1425
1426         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1427
1428         spin_lock(&obd->obd_dev_lock);
1429         /* umount has run already, so evict thread should leave
1430          * its task to umount thread now */
1431         if (obd->obd_stopping) {
1432                 spin_unlock(&obd->obd_dev_lock);
1433                 return exports_evicted;
1434         }
1435         nid_hash = obd->obd_nid_hash;
1436         cfs_hash_getref(nid_hash);
1437         spin_unlock(&obd->obd_dev_lock);
1438
1439         do {
1440                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1441                 if (doomed_exp == NULL)
1442                         break;
1443
1444                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1445                          "nid %s found, wanted nid %s, requested nid %s\n",
1446                          obd_export_nid2str(doomed_exp),
1447                          libcfs_nid2str(nid_key), nid);
1448                 LASSERTF(doomed_exp != obd->obd_self_export,
1449                          "self-export is hashed by NID?\n");
1450                 exports_evicted++;
1451                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1452                               "request\n", obd->obd_name,
1453                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1454                               obd_export_nid2str(doomed_exp));
1455                 class_fail_export(doomed_exp);
1456                 class_export_put(doomed_exp);
1457         } while (1);
1458
1459         cfs_hash_putref(nid_hash);
1460
1461         if (!exports_evicted)
1462                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1463                        obd->obd_name, nid);
1464         return exports_evicted;
1465 }
1466 EXPORT_SYMBOL(obd_export_evict_by_nid);
1467
1468 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1469 {
1470         cfs_hash_t *uuid_hash;
1471         struct obd_export *doomed_exp = NULL;
1472         struct obd_uuid doomed_uuid;
1473         int exports_evicted = 0;
1474
1475         spin_lock(&obd->obd_dev_lock);
1476         if (obd->obd_stopping) {
1477                 spin_unlock(&obd->obd_dev_lock);
1478                 return exports_evicted;
1479         }
1480         uuid_hash = obd->obd_uuid_hash;
1481         cfs_hash_getref(uuid_hash);
1482         spin_unlock(&obd->obd_dev_lock);
1483
1484         obd_str2uuid(&doomed_uuid, uuid);
1485         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1486                 CERROR("%s: can't evict myself\n", obd->obd_name);
1487                 cfs_hash_putref(uuid_hash);
1488                 return exports_evicted;
1489         }
1490
1491         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1492
1493         if (doomed_exp == NULL) {
1494                 CERROR("%s: can't disconnect %s: no exports found\n",
1495                        obd->obd_name, uuid);
1496         } else {
1497                 CWARN("%s: evicting %s at adminstrative request\n",
1498                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1499                 class_fail_export(doomed_exp);
1500                 class_export_put(doomed_exp);
1501                 exports_evicted++;
1502         }
1503         cfs_hash_putref(uuid_hash);
1504
1505         return exports_evicted;
1506 }
1507 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1508
1509 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1510 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1511 EXPORT_SYMBOL(class_export_dump_hook);
1512 #endif
1513
1514 static void print_export_data(struct obd_export *exp, const char *status,
1515                               int locks)
1516 {
1517         struct ptlrpc_reply_state *rs;
1518         struct ptlrpc_reply_state *first_reply = NULL;
1519         int nreplies = 0;
1520
1521         spin_lock(&exp->exp_lock);
1522         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1523                                 rs_exp_list) {
1524                 if (nreplies == 0)
1525                         first_reply = rs;
1526                 nreplies++;
1527         }
1528         spin_unlock(&exp->exp_lock);
1529
1530         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1531                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1532                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1533                cfs_atomic_read(&exp->exp_rpc_count),
1534                cfs_atomic_read(&exp->exp_cb_count),
1535                cfs_atomic_read(&exp->exp_locks_count),
1536                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1537                nreplies, first_reply, nreplies > 3 ? "..." : "",
1538                exp->exp_last_committed);
1539 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1540         if (locks && class_export_dump_hook != NULL)
1541                 class_export_dump_hook(exp);
1542 #endif
1543 }
1544
1545 void dump_exports(struct obd_device *obd, int locks)
1546 {
1547         struct obd_export *exp;
1548
1549         spin_lock(&obd->obd_dev_lock);
1550         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1551                 print_export_data(exp, "ACTIVE", locks);
1552         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1553                 print_export_data(exp, "UNLINKED", locks);
1554         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1555                 print_export_data(exp, "DELAYED", locks);
1556         spin_unlock(&obd->obd_dev_lock);
1557         spin_lock(&obd_zombie_impexp_lock);
1558         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1559                 print_export_data(exp, "ZOMBIE", locks);
1560         spin_unlock(&obd_zombie_impexp_lock);
1561 }
1562 EXPORT_SYMBOL(dump_exports);
1563
1564 void obd_exports_barrier(struct obd_device *obd)
1565 {
1566         int waited = 2;
1567         LASSERT(cfs_list_empty(&obd->obd_exports));
1568         spin_lock(&obd->obd_dev_lock);
1569         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1570                 spin_unlock(&obd->obd_dev_lock);
1571                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1572                                                    cfs_time_seconds(waited));
1573                 if (waited > 5 && IS_PO2(waited)) {
1574                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1575                                       "more than %d seconds. "
1576                                       "The obd refcount = %d. Is it stuck?\n",
1577                                       obd->obd_name, waited,
1578                                       cfs_atomic_read(&obd->obd_refcount));
1579                         dump_exports(obd, 1);
1580                 }
1581                 waited *= 2;
1582                 spin_lock(&obd->obd_dev_lock);
1583         }
1584         spin_unlock(&obd->obd_dev_lock);
1585 }
1586 EXPORT_SYMBOL(obd_exports_barrier);
1587
1588 /* Total amount of zombies to be destroyed */
1589 static int zombies_count = 0;
1590
1591 /**
1592  * kill zombie imports and exports
1593  */
1594 void obd_zombie_impexp_cull(void)
1595 {
1596         struct obd_import *import;
1597         struct obd_export *export;
1598         ENTRY;
1599
1600         do {
1601                 spin_lock(&obd_zombie_impexp_lock);
1602
1603                 import = NULL;
1604                 if (!cfs_list_empty(&obd_zombie_imports)) {
1605                         import = cfs_list_entry(obd_zombie_imports.next,
1606                                                 struct obd_import,
1607                                                 imp_zombie_chain);
1608                         cfs_list_del_init(&import->imp_zombie_chain);
1609                 }
1610
1611                 export = NULL;
1612                 if (!cfs_list_empty(&obd_zombie_exports)) {
1613                         export = cfs_list_entry(obd_zombie_exports.next,
1614                                                 struct obd_export,
1615                                                 exp_obd_chain);
1616                         cfs_list_del_init(&export->exp_obd_chain);
1617                 }
1618
1619                 spin_unlock(&obd_zombie_impexp_lock);
1620
1621                 if (import != NULL) {
1622                         class_import_destroy(import);
1623                         spin_lock(&obd_zombie_impexp_lock);
1624                         zombies_count--;
1625                         spin_unlock(&obd_zombie_impexp_lock);
1626                 }
1627
1628                 if (export != NULL) {
1629                         class_export_destroy(export);
1630                         spin_lock(&obd_zombie_impexp_lock);
1631                         zombies_count--;
1632                         spin_unlock(&obd_zombie_impexp_lock);
1633                 }
1634
1635                 cond_resched();
1636         } while (import != NULL || export != NULL);
1637         EXIT;
1638 }
1639
1640 static struct completion        obd_zombie_start;
1641 static struct completion        obd_zombie_stop;
1642 static unsigned long            obd_zombie_flags;
1643 static wait_queue_head_t        obd_zombie_waitq;
1644 static pid_t                    obd_zombie_pid;
1645
1646 enum {
1647         OBD_ZOMBIE_STOP         = 0x0001,
1648 };
1649
1650 /**
1651  * check for work for kill zombie import/export thread.
1652  */
1653 static int obd_zombie_impexp_check(void *arg)
1654 {
1655         int rc;
1656
1657         spin_lock(&obd_zombie_impexp_lock);
1658         rc = (zombies_count == 0) &&
1659              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1660         spin_unlock(&obd_zombie_impexp_lock);
1661
1662         RETURN(rc);
1663 }
1664
1665 /**
1666  * Add export to the obd_zombe thread and notify it.
1667  */
1668 static void obd_zombie_export_add(struct obd_export *exp) {
1669         spin_lock(&exp->exp_obd->obd_dev_lock);
1670         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1671         cfs_list_del_init(&exp->exp_obd_chain);
1672         spin_unlock(&exp->exp_obd->obd_dev_lock);
1673         spin_lock(&obd_zombie_impexp_lock);
1674         zombies_count++;
1675         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1676         spin_unlock(&obd_zombie_impexp_lock);
1677
1678         obd_zombie_impexp_notify();
1679 }
1680
1681 /**
1682  * Add import to the obd_zombe thread and notify it.
1683  */
1684 static void obd_zombie_import_add(struct obd_import *imp) {
1685         LASSERT(imp->imp_sec == NULL);
1686         LASSERT(imp->imp_rq_pool == NULL);
1687         spin_lock(&obd_zombie_impexp_lock);
1688         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1689         zombies_count++;
1690         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1691         spin_unlock(&obd_zombie_impexp_lock);
1692
1693         obd_zombie_impexp_notify();
1694 }
1695
1696 /**
1697  * notify import/export destroy thread about new zombie.
1698  */
1699 static void obd_zombie_impexp_notify(void)
1700 {
1701         /*
1702          * Make sure obd_zomebie_impexp_thread get this notification.
1703          * It is possible this signal only get by obd_zombie_barrier, and
1704          * barrier gulps this notification and sleeps away and hangs ensues
1705          */
1706         wake_up_all(&obd_zombie_waitq);
1707 }
1708
1709 /**
1710  * check whether obd_zombie is idle
1711  */
1712 static int obd_zombie_is_idle(void)
1713 {
1714         int rc;
1715
1716         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1717         spin_lock(&obd_zombie_impexp_lock);
1718         rc = (zombies_count == 0);
1719         spin_unlock(&obd_zombie_impexp_lock);
1720         return rc;
1721 }
1722
1723 /**
1724  * wait when obd_zombie import/export queues become empty
1725  */
1726 void obd_zombie_barrier(void)
1727 {
1728         struct l_wait_info lwi = { 0 };
1729
1730         if (obd_zombie_pid == current_pid())
1731                 /* don't wait for myself */
1732                 return;
1733         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1734 }
1735 EXPORT_SYMBOL(obd_zombie_barrier);
1736
1737 #ifdef __KERNEL__
1738
1739 /**
1740  * destroy zombie export/import thread.
1741  */
1742 static int obd_zombie_impexp_thread(void *unused)
1743 {
1744         unshare_fs_struct();
1745         complete(&obd_zombie_start);
1746
1747         obd_zombie_pid = current_pid();
1748
1749         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1750                 struct l_wait_info lwi = { 0 };
1751
1752                 l_wait_event(obd_zombie_waitq,
1753                              !obd_zombie_impexp_check(NULL), &lwi);
1754                 obd_zombie_impexp_cull();
1755
1756                 /*
1757                  * Notify obd_zombie_barrier callers that queues
1758                  * may be empty.
1759                  */
1760                 wake_up(&obd_zombie_waitq);
1761         }
1762
1763         complete(&obd_zombie_stop);
1764
1765         RETURN(0);
1766 }
1767
1768 #else /* ! KERNEL */
1769
1770 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1771 static void *obd_zombie_impexp_work_cb;
1772 static void *obd_zombie_impexp_idle_cb;
1773
1774 int obd_zombie_impexp_kill(void *arg)
1775 {
1776         int rc = 0;
1777
1778         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1779                 obd_zombie_impexp_cull();
1780                 rc = 1;
1781         }
1782         cfs_atomic_dec(&zombie_recur);
1783         return rc;
1784 }
1785
1786 #endif
1787
1788 /**
1789  * start destroy zombie import/export thread
1790  */
1791 int obd_zombie_impexp_init(void)
1792 {
1793 #ifdef __KERNEL__
1794         struct task_struct *task;
1795 #endif
1796
1797         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1798         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1799         spin_lock_init(&obd_zombie_impexp_lock);
1800         init_completion(&obd_zombie_start);
1801         init_completion(&obd_zombie_stop);
1802         init_waitqueue_head(&obd_zombie_waitq);
1803         obd_zombie_pid = 0;
1804
1805 #ifdef __KERNEL__
1806         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1807         if (IS_ERR(task))
1808                 RETURN(PTR_ERR(task));
1809
1810         wait_for_completion(&obd_zombie_start);
1811 #else
1812
1813         obd_zombie_impexp_work_cb =
1814                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1815                                                  &obd_zombie_impexp_kill, NULL);
1816
1817         obd_zombie_impexp_idle_cb =
1818                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1819                                                  &obd_zombie_impexp_check, NULL);
1820 #endif
1821         RETURN(0);
1822 }
1823 /**
1824  * stop destroy zombie import/export thread
1825  */
1826 void obd_zombie_impexp_stop(void)
1827 {
1828         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1829         obd_zombie_impexp_notify();
1830 #ifdef __KERNEL__
1831         wait_for_completion(&obd_zombie_stop);
1832 #else
1833         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1834         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1835 #endif
1836 }
1837
1838 /***** Kernel-userspace comm helpers *******/
1839
1840 /* Get length of entire message, including header */
1841 int kuc_len(int payload_len)
1842 {
1843         return sizeof(struct kuc_hdr) + payload_len;
1844 }
1845 EXPORT_SYMBOL(kuc_len);
1846
1847 /* Get a pointer to kuc header, given a ptr to the payload
1848  * @param p Pointer to payload area
1849  * @returns Pointer to kuc header
1850  */
1851 struct kuc_hdr * kuc_ptr(void *p)
1852 {
1853         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1854         LASSERT(lh->kuc_magic == KUC_MAGIC);
1855         return lh;
1856 }
1857 EXPORT_SYMBOL(kuc_ptr);
1858
1859 /* Test if payload is part of kuc message
1860  * @param p Pointer to payload area
1861  * @returns boolean
1862  */
1863 int kuc_ispayload(void *p)
1864 {
1865         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1866
1867         if (kh->kuc_magic == KUC_MAGIC)
1868                 return 1;
1869         else
1870                 return 0;
1871 }
1872 EXPORT_SYMBOL(kuc_ispayload);
1873
1874 /* Alloc space for a message, and fill in header
1875  * @return Pointer to payload area
1876  */
1877 void *kuc_alloc(int payload_len, int transport, int type)
1878 {
1879         struct kuc_hdr *lh;
1880         int len = kuc_len(payload_len);
1881
1882         OBD_ALLOC(lh, len);
1883         if (lh == NULL)
1884                 return ERR_PTR(-ENOMEM);
1885
1886         lh->kuc_magic = KUC_MAGIC;
1887         lh->kuc_transport = transport;
1888         lh->kuc_msgtype = type;
1889         lh->kuc_msglen = len;
1890
1891         return (void *)(lh + 1);
1892 }
1893 EXPORT_SYMBOL(kuc_alloc);
1894
1895 /* Takes pointer to payload area */
1896 inline void kuc_free(void *p, int payload_len)
1897 {
1898         struct kuc_hdr *lh = kuc_ptr(p);
1899         OBD_FREE(lh, kuc_len(payload_len));
1900 }
1901 EXPORT_SYMBOL(kuc_free);
1902
1903
1904