Whamcloud - gitweb
LU-1095 debug: Quiet/cleanup various common console messages
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
129                         modname = LUSTRE_MDT_NAME;
130
131                 if (!cfs_request_module("%s", modname)) {
132                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
133                         type = class_search_type(name);
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
136                                            modname);
137                 }
138         }
139 #endif
140         if (type) {
141                 spin_lock(&type->obd_type_lock);
142                 type->typ_refcnt++;
143                 cfs_try_module_get(type->typ_dt_ops->o_owner);
144                 spin_unlock(&type->obd_type_lock);
145         }
146         return type;
147 }
148 EXPORT_SYMBOL(class_get_type);
149
150 void class_put_type(struct obd_type *type)
151 {
152         LASSERT(type);
153         spin_lock(&type->obd_type_lock);
154         type->typ_refcnt--;
155         cfs_module_put(type->typ_dt_ops->o_owner);
156         spin_unlock(&type->obd_type_lock);
157 }
158 EXPORT_SYMBOL(class_put_type);
159
160 #define CLASS_MAX_NAME 1024
161
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163                         struct lprocfs_vars *vars, const char *name,
164                         struct lu_device_type *ldt)
165 {
166         struct obd_type *type;
167         int rc = 0;
168         ENTRY;
169
170         /* sanity check */
171         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172
173         if (class_search_type(name)) {
174                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175                 RETURN(-EEXIST);
176         }
177
178         rc = -ENOMEM;
179         OBD_ALLOC(type, sizeof(*type));
180         if (type == NULL)
181                 RETURN(rc);
182
183         OBD_ALLOC_PTR(type->typ_dt_ops);
184         OBD_ALLOC_PTR(type->typ_md_ops);
185         OBD_ALLOC(type->typ_name, strlen(name) + 1);
186
187         if (type->typ_dt_ops == NULL ||
188             type->typ_md_ops == NULL ||
189             type->typ_name == NULL)
190                 GOTO (failed, rc);
191
192         *(type->typ_dt_ops) = *dt_ops;
193         /* md_ops is optional */
194         if (md_ops)
195                 *(type->typ_md_ops) = *md_ops;
196         strcpy(type->typ_name, name);
197         spin_lock_init(&type->obd_type_lock);
198
199 #ifdef LPROCFS
200         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
201                                               vars, type);
202         if (IS_ERR(type->typ_procroot)) {
203                 rc = PTR_ERR(type->typ_procroot);
204                 type->typ_procroot = NULL;
205                 GOTO (failed, rc);
206         }
207 #endif
208         if (ldt != NULL) {
209                 type->typ_lu = ldt;
210                 rc = lu_device_type_init(ldt);
211                 if (rc != 0)
212                         GOTO (failed, rc);
213         }
214
215         spin_lock(&obd_types_lock);
216         cfs_list_add(&type->typ_chain, &obd_types);
217         spin_unlock(&obd_types_lock);
218
219         RETURN (0);
220
221  failed:
222         if (type->typ_name != NULL)
223                 OBD_FREE(type->typ_name, strlen(name) + 1);
224         if (type->typ_md_ops != NULL)
225                 OBD_FREE_PTR(type->typ_md_ops);
226         if (type->typ_dt_ops != NULL)
227                 OBD_FREE_PTR(type->typ_dt_ops);
228         OBD_FREE(type, sizeof(*type));
229         RETURN(rc);
230 }
231 EXPORT_SYMBOL(class_register_type);
232
233 int class_unregister_type(const char *name)
234 {
235         struct obd_type *type = class_search_type(name);
236         ENTRY;
237
238         if (!type) {
239                 CERROR("unknown obd type\n");
240                 RETURN(-EINVAL);
241         }
242
243         if (type->typ_refcnt) {
244                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
245                 /* This is a bad situation, let's make the best of it */
246                 /* Remove ops, but leave the name for debugging */
247                 OBD_FREE_PTR(type->typ_dt_ops);
248                 OBD_FREE_PTR(type->typ_md_ops);
249                 RETURN(-EBUSY);
250         }
251
252         /* we do not use type->typ_procroot as for compatibility purposes
253          * other modules can share names (i.e. lod can use lov entry). so
254          * we can't reference pointer as it can get invalided when another
255          * module removes the entry */
256         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
257
258         if (type->typ_lu)
259                 lu_device_type_fini(type->typ_lu);
260
261         spin_lock(&obd_types_lock);
262         cfs_list_del(&type->typ_chain);
263         spin_unlock(&obd_types_lock);
264         OBD_FREE(type->typ_name, strlen(name) + 1);
265         if (type->typ_dt_ops != NULL)
266                 OBD_FREE_PTR(type->typ_dt_ops);
267         if (type->typ_md_ops != NULL)
268                 OBD_FREE_PTR(type->typ_md_ops);
269         OBD_FREE(type, sizeof(*type));
270         RETURN(0);
271 } /* class_unregister_type */
272 EXPORT_SYMBOL(class_unregister_type);
273
274 /**
275  * Create a new obd device.
276  *
277  * Find an empty slot in ::obd_devs[], create a new obd device in it.
278  *
279  * \param[in] type_name obd device type string.
280  * \param[in] name      obd device name.
281  *
282  * \retval NULL if create fails, otherwise return the obd device
283  *         pointer created.
284  */
285 struct obd_device *class_newdev(const char *type_name, const char *name)
286 {
287         struct obd_device *result = NULL;
288         struct obd_device *newdev;
289         struct obd_type *type = NULL;
290         int i;
291         int new_obd_minor = 0;
292         ENTRY;
293
294         if (strlen(name) >= MAX_OBD_NAME) {
295                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
296                 RETURN(ERR_PTR(-EINVAL));
297         }
298
299         type = class_get_type(type_name);
300         if (type == NULL){
301                 CERROR("OBD: unknown type: %s\n", type_name);
302                 RETURN(ERR_PTR(-ENODEV));
303         }
304
305         newdev = obd_device_alloc();
306         if (newdev == NULL)
307                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
308
309         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
310
311         write_lock(&obd_dev_lock);
312         for (i = 0; i < class_devno_max(); i++) {
313                 struct obd_device *obd = class_num2obd(i);
314
315                 if (obd && obd->obd_name &&
316                     (strcmp(name, obd->obd_name) == 0)) {
317                         CERROR("Device %s already exists at %d, won't add\n",
318                                name, i);
319                         if (result) {
320                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
321                                          "%p obd_magic %08x != %08x\n", result,
322                                          result->obd_magic, OBD_DEVICE_MAGIC);
323                                 LASSERTF(result->obd_minor == new_obd_minor,
324                                          "%p obd_minor %d != %d\n", result,
325                                          result->obd_minor, new_obd_minor);
326
327                                 obd_devs[result->obd_minor] = NULL;
328                                 result->obd_name[0]='\0';
329                          }
330                         result = ERR_PTR(-EEXIST);
331                         break;
332                 }
333                 if (!result && !obd) {
334                         result = newdev;
335                         result->obd_minor = i;
336                         new_obd_minor = i;
337                         result->obd_type = type;
338                         strncpy(result->obd_name, name,
339                                 sizeof(result->obd_name) - 1);
340                         obd_devs[i] = result;
341                 }
342         }
343         write_unlock(&obd_dev_lock);
344
345         if (result == NULL && i >= class_devno_max()) {
346                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
347                        class_devno_max());
348                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
349         }
350
351         if (IS_ERR(result))
352                 GOTO(out, result);
353
354         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
355                result->obd_name, result);
356
357         RETURN(result);
358 out:
359         obd_device_free(newdev);
360 out_type:
361         class_put_type(type);
362         return result;
363 }
364
365 void class_release_dev(struct obd_device *obd)
366 {
367         struct obd_type *obd_type = obd->obd_type;
368
369         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
370                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
371         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
372                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
373         LASSERT(obd_type != NULL);
374
375         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
376                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
377
378         write_lock(&obd_dev_lock);
379         obd_devs[obd->obd_minor] = NULL;
380         write_unlock(&obd_dev_lock);
381         obd_device_free(obd);
382
383         class_put_type(obd_type);
384 }
385
386 int class_name2dev(const char *name)
387 {
388         int i;
389
390         if (!name)
391                 return -1;
392
393         read_lock(&obd_dev_lock);
394         for (i = 0; i < class_devno_max(); i++) {
395                 struct obd_device *obd = class_num2obd(i);
396
397                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
398                         /* Make sure we finished attaching before we give
399                            out any references */
400                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
401                         if (obd->obd_attached) {
402                                 read_unlock(&obd_dev_lock);
403                                 return i;
404                         }
405                         break;
406                 }
407         }
408         read_unlock(&obd_dev_lock);
409
410         return -1;
411 }
412 EXPORT_SYMBOL(class_name2dev);
413
414 struct obd_device *class_name2obd(const char *name)
415 {
416         int dev = class_name2dev(name);
417
418         if (dev < 0 || dev > class_devno_max())
419                 return NULL;
420         return class_num2obd(dev);
421 }
422 EXPORT_SYMBOL(class_name2obd);
423
424 int class_uuid2dev(struct obd_uuid *uuid)
425 {
426         int i;
427
428         read_lock(&obd_dev_lock);
429         for (i = 0; i < class_devno_max(); i++) {
430                 struct obd_device *obd = class_num2obd(i);
431
432                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
433                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
434                         read_unlock(&obd_dev_lock);
435                         return i;
436                 }
437         }
438         read_unlock(&obd_dev_lock);
439
440         return -1;
441 }
442 EXPORT_SYMBOL(class_uuid2dev);
443
444 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
445 {
446         int dev = class_uuid2dev(uuid);
447         if (dev < 0)
448                 return NULL;
449         return class_num2obd(dev);
450 }
451 EXPORT_SYMBOL(class_uuid2obd);
452
453 /**
454  * Get obd device from ::obd_devs[]
455  *
456  * \param num [in] array index
457  *
458  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
459  *         otherwise return the obd device there.
460  */
461 struct obd_device *class_num2obd(int num)
462 {
463         struct obd_device *obd = NULL;
464
465         if (num < class_devno_max()) {
466                 obd = obd_devs[num];
467                 if (obd == NULL)
468                         return NULL;
469
470                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
471                          "%p obd_magic %08x != %08x\n",
472                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
473                 LASSERTF(obd->obd_minor == num,
474                          "%p obd_minor %0d != %0d\n",
475                          obd, obd->obd_minor, num);
476         }
477
478         return obd;
479 }
480 EXPORT_SYMBOL(class_num2obd);
481
482 /**
483  * Get obd devices count. Device in any
484  *    state are counted
485  * \retval obd device count
486  */
487 int get_devices_count(void)
488 {
489         int index, max_index = class_devno_max(), dev_count = 0;
490
491         read_lock(&obd_dev_lock);
492         for (index = 0; index <= max_index; index++) {
493                 struct obd_device *obd = class_num2obd(index);
494                 if (obd != NULL)
495                         dev_count++;
496         }
497         read_unlock(&obd_dev_lock);
498
499         return dev_count;
500 }
501 EXPORT_SYMBOL(get_devices_count);
502
503 void class_obd_list(void)
504 {
505         char *status;
506         int i;
507
508         read_lock(&obd_dev_lock);
509         for (i = 0; i < class_devno_max(); i++) {
510                 struct obd_device *obd = class_num2obd(i);
511
512                 if (obd == NULL)
513                         continue;
514                 if (obd->obd_stopping)
515                         status = "ST";
516                 else if (obd->obd_set_up)
517                         status = "UP";
518                 else if (obd->obd_attached)
519                         status = "AT";
520                 else
521                         status = "--";
522                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
523                          i, status, obd->obd_type->typ_name,
524                          obd->obd_name, obd->obd_uuid.uuid,
525                          cfs_atomic_read(&obd->obd_refcount));
526         }
527         read_unlock(&obd_dev_lock);
528         return;
529 }
530
531 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
532    specified, then only the client with that uuid is returned,
533    otherwise any client connected to the tgt is returned. */
534 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
535                                           const char * typ_name,
536                                           struct obd_uuid *grp_uuid)
537 {
538         int i;
539
540         read_lock(&obd_dev_lock);
541         for (i = 0; i < class_devno_max(); i++) {
542                 struct obd_device *obd = class_num2obd(i);
543
544                 if (obd == NULL)
545                         continue;
546                 if ((strncmp(obd->obd_type->typ_name, typ_name,
547                              strlen(typ_name)) == 0)) {
548                         if (obd_uuid_equals(tgt_uuid,
549                                             &obd->u.cli.cl_target_uuid) &&
550                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
551                                                          &obd->obd_uuid) : 1)) {
552                                 read_unlock(&obd_dev_lock);
553                                 return obd;
554                         }
555                 }
556         }
557         read_unlock(&obd_dev_lock);
558
559         return NULL;
560 }
561 EXPORT_SYMBOL(class_find_client_obd);
562
563 /* Iterate the obd_device list looking devices have grp_uuid. Start
564    searching at *next, and if a device is found, the next index to look
565    at is saved in *next. If next is NULL, then the first matching device
566    will always be returned. */
567 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
568 {
569         int i;
570
571         if (next == NULL)
572                 i = 0;
573         else if (*next >= 0 && *next < class_devno_max())
574                 i = *next;
575         else
576                 return NULL;
577
578         read_lock(&obd_dev_lock);
579         for (; i < class_devno_max(); i++) {
580                 struct obd_device *obd = class_num2obd(i);
581
582                 if (obd == NULL)
583                         continue;
584                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
585                         if (next != NULL)
586                                 *next = i+1;
587                         read_unlock(&obd_dev_lock);
588                         return obd;
589                 }
590         }
591         read_unlock(&obd_dev_lock);
592
593         return NULL;
594 }
595 EXPORT_SYMBOL(class_devices_in_group);
596
597 /**
598  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
599  * adjust sptlrpc settings accordingly.
600  */
601 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
602 {
603         struct obd_device  *obd;
604         const char         *type;
605         int                 i, rc = 0, rc2;
606
607         LASSERT(namelen > 0);
608
609         read_lock(&obd_dev_lock);
610         for (i = 0; i < class_devno_max(); i++) {
611                 obd = class_num2obd(i);
612
613                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
614                         continue;
615
616                 /* only notify mdc, osc, mdt, ost */
617                 type = obd->obd_type->typ_name;
618                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
619                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
620                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
621                     strcmp(type, LUSTRE_OST_NAME) != 0)
622                         continue;
623
624                 if (strncmp(obd->obd_name, fsname, namelen))
625                         continue;
626
627                 class_incref(obd, __FUNCTION__, obd);
628                 read_unlock(&obd_dev_lock);
629                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
630                                          sizeof(KEY_SPTLRPC_CONF),
631                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
632                 rc = rc ? rc : rc2;
633                 class_decref(obd, __FUNCTION__, obd);
634                 read_lock(&obd_dev_lock);
635         }
636         read_unlock(&obd_dev_lock);
637         return rc;
638 }
639 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
640
641 void obd_cleanup_caches(void)
642 {
643         int rc;
644
645         ENTRY;
646         if (obd_device_cachep) {
647                 rc = cfs_mem_cache_destroy(obd_device_cachep);
648                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
649                 obd_device_cachep = NULL;
650         }
651         if (obdo_cachep) {
652                 rc = cfs_mem_cache_destroy(obdo_cachep);
653                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
654                 obdo_cachep = NULL;
655         }
656         if (import_cachep) {
657                 rc = cfs_mem_cache_destroy(import_cachep);
658                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
659                 import_cachep = NULL;
660         }
661         if (capa_cachep) {
662                 rc = cfs_mem_cache_destroy(capa_cachep);
663                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
664                 capa_cachep = NULL;
665         }
666         EXIT;
667 }
668
669 int obd_init_caches(void)
670 {
671         ENTRY;
672
673         LASSERT(obd_device_cachep == NULL);
674         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
675                                                  sizeof(struct obd_device),
676                                                  0, 0);
677         if (!obd_device_cachep)
678                 GOTO(out, -ENOMEM);
679
680         LASSERT(obdo_cachep == NULL);
681         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
682                                            0, 0);
683         if (!obdo_cachep)
684                 GOTO(out, -ENOMEM);
685
686         LASSERT(import_cachep == NULL);
687         import_cachep = cfs_mem_cache_create("ll_import_cache",
688                                              sizeof(struct obd_import),
689                                              0, 0);
690         if (!import_cachep)
691                 GOTO(out, -ENOMEM);
692
693         LASSERT(capa_cachep == NULL);
694         capa_cachep = cfs_mem_cache_create("capa_cache",
695                                            sizeof(struct obd_capa), 0, 0);
696         if (!capa_cachep)
697                 GOTO(out, -ENOMEM);
698
699         RETURN(0);
700  out:
701         obd_cleanup_caches();
702         RETURN(-ENOMEM);
703
704 }
705
706 /* map connection to client */
707 struct obd_export *class_conn2export(struct lustre_handle *conn)
708 {
709         struct obd_export *export;
710         ENTRY;
711
712         if (!conn) {
713                 CDEBUG(D_CACHE, "looking for null handle\n");
714                 RETURN(NULL);
715         }
716
717         if (conn->cookie == -1) {  /* this means assign a new connection */
718                 CDEBUG(D_CACHE, "want a new connection\n");
719                 RETURN(NULL);
720         }
721
722         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
723         export = class_handle2object(conn->cookie);
724         RETURN(export);
725 }
726 EXPORT_SYMBOL(class_conn2export);
727
728 struct obd_device *class_exp2obd(struct obd_export *exp)
729 {
730         if (exp)
731                 return exp->exp_obd;
732         return NULL;
733 }
734 EXPORT_SYMBOL(class_exp2obd);
735
736 struct obd_device *class_conn2obd(struct lustre_handle *conn)
737 {
738         struct obd_export *export;
739         export = class_conn2export(conn);
740         if (export) {
741                 struct obd_device *obd = export->exp_obd;
742                 class_export_put(export);
743                 return obd;
744         }
745         return NULL;
746 }
747 EXPORT_SYMBOL(class_conn2obd);
748
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
750 {
751         struct obd_device *obd = exp->exp_obd;
752         if (obd == NULL)
753                 return NULL;
754         return obd->u.cli.cl_import;
755 }
756 EXPORT_SYMBOL(class_exp2cliimp);
757
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
759 {
760         struct obd_device *obd = class_conn2obd(conn);
761         if (obd == NULL)
762                 return NULL;
763         return obd->u.cli.cl_import;
764 }
765 EXPORT_SYMBOL(class_conn2cliimp);
766
767 /* Export management functions */
768 static void class_export_destroy(struct obd_export *exp)
769 {
770         struct obd_device *obd = exp->exp_obd;
771         ENTRY;
772
773         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774         LASSERT(obd != NULL);
775
776         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777                exp->exp_client_uuid.uuid, obd->obd_name);
778
779         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
780         if (exp->exp_connection)
781                 ptlrpc_put_connection_superhack(exp->exp_connection);
782
783         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
784         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
785         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
786         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
787         obd_destroy_export(exp);
788         class_decref(obd, "export", exp);
789
790         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
791         EXIT;
792 }
793
794 static void export_handle_addref(void *export)
795 {
796         class_export_get(export);
797 }
798
799 static struct portals_handle_ops export_handle_ops = {
800         .hop_addref = export_handle_addref,
801         .hop_free   = NULL,
802 };
803
804 struct obd_export *class_export_get(struct obd_export *exp)
805 {
806         cfs_atomic_inc(&exp->exp_refcount);
807         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
808                cfs_atomic_read(&exp->exp_refcount));
809         return exp;
810 }
811 EXPORT_SYMBOL(class_export_get);
812
813 void class_export_put(struct obd_export *exp)
814 {
815         LASSERT(exp != NULL);
816         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
817         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
818                cfs_atomic_read(&exp->exp_refcount) - 1);
819
820         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
821                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
822                 CDEBUG(D_IOCTL, "final put %p/%s\n",
823                        exp, exp->exp_client_uuid.uuid);
824
825                 /* release nid stat refererence */
826                 lprocfs_exp_cleanup(exp);
827
828                 obd_zombie_export_add(exp);
829         }
830 }
831 EXPORT_SYMBOL(class_export_put);
832
833 /* Creates a new export, adds it to the hash table, and returns a
834  * pointer to it. The refcount is 2: one for the hash reference, and
835  * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837                                     struct obd_uuid *cluuid)
838 {
839         struct obd_export *export;
840         cfs_hash_t *hash = NULL;
841         int rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(export);
845         if (!export)
846                 return ERR_PTR(-ENOMEM);
847
848         export->exp_conn_cnt = 0;
849         export->exp_lock_hash = NULL;
850         export->exp_flock_hash = NULL;
851         cfs_atomic_set(&export->exp_refcount, 2);
852         cfs_atomic_set(&export->exp_rpc_count, 0);
853         cfs_atomic_set(&export->exp_cb_count, 0);
854         cfs_atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
857         spin_lock_init(&export->exp_locks_list_guard);
858 #endif
859         cfs_atomic_set(&export->exp_replay_count, 0);
860         export->exp_obd = obd;
861         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
862         spin_lock_init(&export->exp_uncommitted_replies_lock);
863         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
865         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
866         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
867         class_handle_hash(&export->exp_handle, &export_handle_ops);
868         export->exp_last_request_time = cfs_time_current_sec();
869         spin_lock_init(&export->exp_lock);
870         spin_lock_init(&export->exp_rpc_lock);
871         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
872         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
873         spin_lock_init(&export->exp_bl_list_lock);
874         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
875
876         export->exp_sp_peer = LUSTRE_SP_ANY;
877         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
878         export->exp_client_uuid = *cluuid;
879         obd_init_export(export);
880
881         spin_lock(&obd->obd_dev_lock);
882         /* shouldn't happen, but might race */
883         if (obd->obd_stopping)
884                 GOTO(exit_unlock, rc = -ENODEV);
885
886         hash = cfs_hash_getref(obd->obd_uuid_hash);
887         if (hash == NULL)
888                 GOTO(exit_unlock, rc = -ENODEV);
889         spin_unlock(&obd->obd_dev_lock);
890
891         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
892                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
893                 if (rc != 0) {
894                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
895                                       obd->obd_name, cluuid->uuid, rc);
896                         GOTO(exit_err, rc = -EALREADY);
897                 }
898         }
899
900         spin_lock(&obd->obd_dev_lock);
901         if (obd->obd_stopping) {
902                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
903                 GOTO(exit_unlock, rc = -ENODEV);
904         }
905
906         class_incref(obd, "export", export);
907         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
908         cfs_list_add_tail(&export->exp_obd_chain_timed,
909                           &export->exp_obd->obd_exports_timed);
910         export->exp_obd->obd_num_exports++;
911         spin_unlock(&obd->obd_dev_lock);
912         cfs_hash_putref(hash);
913         RETURN(export);
914
915 exit_unlock:
916         spin_unlock(&obd->obd_dev_lock);
917 exit_err:
918         if (hash)
919                 cfs_hash_putref(hash);
920         class_handle_unhash(&export->exp_handle);
921         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
922         obd_destroy_export(export);
923         OBD_FREE_PTR(export);
924         return ERR_PTR(rc);
925 }
926 EXPORT_SYMBOL(class_new_export);
927
928 void class_unlink_export(struct obd_export *exp)
929 {
930         class_handle_unhash(&exp->exp_handle);
931
932         spin_lock(&exp->exp_obd->obd_dev_lock);
933         /* delete an uuid-export hashitem from hashtables */
934         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
935                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
936                              &exp->exp_client_uuid,
937                              &exp->exp_uuid_hash);
938
939         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
940         cfs_list_del_init(&exp->exp_obd_chain_timed);
941         exp->exp_obd->obd_num_exports--;
942         spin_unlock(&exp->exp_obd->obd_dev_lock);
943         class_export_put(exp);
944 }
945 EXPORT_SYMBOL(class_unlink_export);
946
947 /* Import management functions */
948 void class_import_destroy(struct obd_import *imp)
949 {
950         ENTRY;
951
952         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
953                 imp->imp_obd->obd_name);
954
955         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
956
957         ptlrpc_put_connection_superhack(imp->imp_connection);
958
959         while (!cfs_list_empty(&imp->imp_conn_list)) {
960                 struct obd_import_conn *imp_conn;
961
962                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
963                                           struct obd_import_conn, oic_item);
964                 cfs_list_del_init(&imp_conn->oic_item);
965                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
966                 OBD_FREE(imp_conn, sizeof(*imp_conn));
967         }
968
969         LASSERT(imp->imp_sec == NULL);
970         class_decref(imp->imp_obd, "import", imp);
971         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
972         EXIT;
973 }
974
975 static void import_handle_addref(void *import)
976 {
977         class_import_get(import);
978 }
979
980 static struct portals_handle_ops import_handle_ops = {
981         .hop_addref = import_handle_addref,
982         .hop_free   = NULL,
983 };
984
985 struct obd_import *class_import_get(struct obd_import *import)
986 {
987         cfs_atomic_inc(&import->imp_refcount);
988         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
989                cfs_atomic_read(&import->imp_refcount),
990                import->imp_obd->obd_name);
991         return import;
992 }
993 EXPORT_SYMBOL(class_import_get);
994
995 void class_import_put(struct obd_import *imp)
996 {
997         ENTRY;
998
999         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1000         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1001
1002         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1003                cfs_atomic_read(&imp->imp_refcount) - 1,
1004                imp->imp_obd->obd_name);
1005
1006         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1007                 CDEBUG(D_INFO, "final put import %p\n", imp);
1008                 obd_zombie_import_add(imp);
1009         }
1010
1011         /* catch possible import put race */
1012         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1013         EXIT;
1014 }
1015 EXPORT_SYMBOL(class_import_put);
1016
1017 static void init_imp_at(struct imp_at *at) {
1018         int i;
1019         at_init(&at->iat_net_latency, 0, 0);
1020         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1021                 /* max service estimates are tracked on the server side, so
1022                    don't use the AT history here, just use the last reported
1023                    val. (But keep hist for proc histogram, worst_ever) */
1024                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1025                         AT_FLG_NOHIST);
1026         }
1027 }
1028
1029 struct obd_import *class_new_import(struct obd_device *obd)
1030 {
1031         struct obd_import *imp;
1032
1033         OBD_ALLOC(imp, sizeof(*imp));
1034         if (imp == NULL)
1035                 return NULL;
1036
1037         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1038         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1039         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1040         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1041         spin_lock_init(&imp->imp_lock);
1042         imp->imp_last_success_conn = 0;
1043         imp->imp_state = LUSTRE_IMP_NEW;
1044         imp->imp_obd = class_incref(obd, "import", imp);
1045         mutex_init(&imp->imp_sec_mutex);
1046         cfs_waitq_init(&imp->imp_recovery_waitq);
1047
1048         cfs_atomic_set(&imp->imp_refcount, 2);
1049         cfs_atomic_set(&imp->imp_unregistering, 0);
1050         cfs_atomic_set(&imp->imp_inflight, 0);
1051         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1052         cfs_atomic_set(&imp->imp_inval_count, 0);
1053         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1054         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1055         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1056         init_imp_at(&imp->imp_at);
1057
1058         /* the default magic is V2, will be used in connect RPC, and
1059          * then adjusted according to the flags in request/reply. */
1060         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1061
1062         return imp;
1063 }
1064 EXPORT_SYMBOL(class_new_import);
1065
1066 void class_destroy_import(struct obd_import *import)
1067 {
1068         LASSERT(import != NULL);
1069         LASSERT(import != LP_POISON);
1070
1071         class_handle_unhash(&import->imp_handle);
1072
1073         spin_lock(&import->imp_lock);
1074         import->imp_generation++;
1075         spin_unlock(&import->imp_lock);
1076         class_import_put(import);
1077 }
1078 EXPORT_SYMBOL(class_destroy_import);
1079
1080 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1081
1082 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1083 {
1084         spin_lock(&exp->exp_locks_list_guard);
1085
1086         LASSERT(lock->l_exp_refs_nr >= 0);
1087
1088         if (lock->l_exp_refs_target != NULL &&
1089             lock->l_exp_refs_target != exp) {
1090                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1091                               exp, lock, lock->l_exp_refs_target);
1092         }
1093         if ((lock->l_exp_refs_nr ++) == 0) {
1094                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1095                 lock->l_exp_refs_target = exp;
1096         }
1097         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098                lock, exp, lock->l_exp_refs_nr);
1099         spin_unlock(&exp->exp_locks_list_guard);
1100 }
1101 EXPORT_SYMBOL(__class_export_add_lock_ref);
1102
1103 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1104 {
1105         spin_lock(&exp->exp_locks_list_guard);
1106         LASSERT(lock->l_exp_refs_nr > 0);
1107         if (lock->l_exp_refs_target != exp) {
1108                 LCONSOLE_WARN("lock %p, "
1109                               "mismatching export pointers: %p, %p\n",
1110                               lock, lock->l_exp_refs_target, exp);
1111         }
1112         if (-- lock->l_exp_refs_nr == 0) {
1113                 cfs_list_del_init(&lock->l_exp_refs_link);
1114                 lock->l_exp_refs_target = NULL;
1115         }
1116         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117                lock, exp, lock->l_exp_refs_nr);
1118         spin_unlock(&exp->exp_locks_list_guard);
1119 }
1120 EXPORT_SYMBOL(__class_export_del_lock_ref);
1121 #endif
1122
1123 /* A connection defines an export context in which preallocation can
1124    be managed. This releases the export pointer reference, and returns
1125    the export handle, so the export refcount is 1 when this function
1126    returns. */
1127 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1128                   struct obd_uuid *cluuid)
1129 {
1130         struct obd_export *export;
1131         LASSERT(conn != NULL);
1132         LASSERT(obd != NULL);
1133         LASSERT(cluuid != NULL);
1134         ENTRY;
1135
1136         export = class_new_export(obd, cluuid);
1137         if (IS_ERR(export))
1138                 RETURN(PTR_ERR(export));
1139
1140         conn->cookie = export->exp_handle.h_cookie;
1141         class_export_put(export);
1142
1143         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1144                cluuid->uuid, conn->cookie);
1145         RETURN(0);
1146 }
1147 EXPORT_SYMBOL(class_connect);
1148
1149 /* if export is involved in recovery then clean up related things */
1150 void class_export_recovery_cleanup(struct obd_export *exp)
1151 {
1152         struct obd_device *obd = exp->exp_obd;
1153
1154         spin_lock(&obd->obd_recovery_task_lock);
1155         if (exp->exp_delayed)
1156                 obd->obd_delayed_clients--;
1157         if (obd->obd_recovering) {
1158                 if (exp->exp_in_recovery) {
1159                         spin_lock(&exp->exp_lock);
1160                         exp->exp_in_recovery = 0;
1161                         spin_unlock(&exp->exp_lock);
1162                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1163                         cfs_atomic_dec(&obd->obd_connected_clients);
1164                 }
1165
1166                 /* if called during recovery then should update
1167                  * obd_stale_clients counter,
1168                  * lightweight exports are not counted */
1169                 if (exp->exp_failed &&
1170                     (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) == 0)
1171                         exp->exp_obd->obd_stale_clients++;
1172         }
1173         spin_unlock(&obd->obd_recovery_task_lock);
1174         /** Cleanup req replay fields */
1175         if (exp->exp_req_replay_needed) {
1176                 spin_lock(&exp->exp_lock);
1177                 exp->exp_req_replay_needed = 0;
1178                 spin_unlock(&exp->exp_lock);
1179                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1180                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1181         }
1182         /** Cleanup lock replay data */
1183         if (exp->exp_lock_replay_needed) {
1184                 spin_lock(&exp->exp_lock);
1185                 exp->exp_lock_replay_needed = 0;
1186                 spin_unlock(&exp->exp_lock);
1187                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1188                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1189         }
1190 }
1191
1192 /* This function removes 1-3 references from the export:
1193  * 1 - for export pointer passed
1194  * and if disconnect really need
1195  * 2 - removing from hash
1196  * 3 - in client_unlink_export
1197  * The export pointer passed to this function can destroyed */
1198 int class_disconnect(struct obd_export *export)
1199 {
1200         int already_disconnected;
1201         ENTRY;
1202
1203         if (export == NULL) {
1204                 CWARN("attempting to free NULL export %p\n", export);
1205                 RETURN(-EINVAL);
1206         }
1207
1208         spin_lock(&export->exp_lock);
1209         already_disconnected = export->exp_disconnected;
1210         export->exp_disconnected = 1;
1211         spin_unlock(&export->exp_lock);
1212
1213         /* class_cleanup(), abort_recovery(), and class_fail_export()
1214          * all end up in here, and if any of them race we shouldn't
1215          * call extra class_export_puts(). */
1216         if (already_disconnected) {
1217                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1218                 GOTO(no_disconn, already_disconnected);
1219         }
1220
1221         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1222                export->exp_handle.h_cookie);
1223
1224         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1225                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1226                              &export->exp_connection->c_peer.nid,
1227                              &export->exp_nid_hash);
1228
1229         class_export_recovery_cleanup(export);
1230         class_unlink_export(export);
1231 no_disconn:
1232         class_export_put(export);
1233         RETURN(0);
1234 }
1235 EXPORT_SYMBOL(class_disconnect);
1236
1237 /* Return non-zero for a fully connected export */
1238 int class_connected_export(struct obd_export *exp)
1239 {
1240         if (exp) {
1241                 int connected;
1242                 spin_lock(&exp->exp_lock);
1243                 connected = (exp->exp_conn_cnt > 0);
1244                 spin_unlock(&exp->exp_lock);
1245                 return connected;
1246         }
1247         return 0;
1248 }
1249 EXPORT_SYMBOL(class_connected_export);
1250
1251 static void class_disconnect_export_list(cfs_list_t *list,
1252                                          enum obd_option flags)
1253 {
1254         int rc;
1255         struct obd_export *exp;
1256         ENTRY;
1257
1258         /* It's possible that an export may disconnect itself, but
1259          * nothing else will be added to this list. */
1260         while (!cfs_list_empty(list)) {
1261                 exp = cfs_list_entry(list->next, struct obd_export,
1262                                      exp_obd_chain);
1263                 /* need for safe call CDEBUG after obd_disconnect */
1264                 class_export_get(exp);
1265
1266                 spin_lock(&exp->exp_lock);
1267                 exp->exp_flags = flags;
1268                 spin_unlock(&exp->exp_lock);
1269
1270                 if (obd_uuid_equals(&exp->exp_client_uuid,
1271                                     &exp->exp_obd->obd_uuid)) {
1272                         CDEBUG(D_HA,
1273                                "exp %p export uuid == obd uuid, don't discon\n",
1274                                exp);
1275                         /* Need to delete this now so we don't end up pointing
1276                          * to work_list later when this export is cleaned up. */
1277                         cfs_list_del_init(&exp->exp_obd_chain);
1278                         class_export_put(exp);
1279                         continue;
1280                 }
1281
1282                 class_export_get(exp);
1283                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1284                        "last request at "CFS_TIME_T"\n",
1285                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1286                        exp, exp->exp_last_request_time);
1287                 /* release one export reference anyway */
1288                 rc = obd_disconnect(exp);
1289
1290                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1291                        obd_export_nid2str(exp), exp, rc);
1292                 class_export_put(exp);
1293         }
1294         EXIT;
1295 }
1296
1297 void class_disconnect_exports(struct obd_device *obd)
1298 {
1299         cfs_list_t work_list;
1300         ENTRY;
1301
1302         /* Move all of the exports from obd_exports to a work list, en masse. */
1303         CFS_INIT_LIST_HEAD(&work_list);
1304         spin_lock(&obd->obd_dev_lock);
1305         cfs_list_splice_init(&obd->obd_exports, &work_list);
1306         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1307         spin_unlock(&obd->obd_dev_lock);
1308
1309         if (!cfs_list_empty(&work_list)) {
1310                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1311                        "disconnecting them\n", obd->obd_minor, obd);
1312                 class_disconnect_export_list(&work_list,
1313                                              exp_flags_from_obd(obd));
1314         } else
1315                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1316                        obd->obd_minor, obd);
1317         EXIT;
1318 }
1319 EXPORT_SYMBOL(class_disconnect_exports);
1320
1321 /* Remove exports that have not completed recovery.
1322  */
1323 void class_disconnect_stale_exports(struct obd_device *obd,
1324                                     int (*test_export)(struct obd_export *))
1325 {
1326         cfs_list_t work_list;
1327         struct obd_export *exp, *n;
1328         int evicted = 0;
1329         ENTRY;
1330
1331         CFS_INIT_LIST_HEAD(&work_list);
1332         spin_lock(&obd->obd_dev_lock);
1333         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1334                                      exp_obd_chain) {
1335                 /* don't count self-export as client */
1336                 if (obd_uuid_equals(&exp->exp_client_uuid,
1337                                     &exp->exp_obd->obd_uuid))
1338                         continue;
1339
1340                 /* don't evict clients which have no slot in last_rcvd
1341                  * (e.g. lightweight connection) */
1342                 if (exp->exp_target_data.ted_lr_idx == -1)
1343                         continue;
1344
1345                 spin_lock(&exp->exp_lock);
1346                 if (exp->exp_failed || test_export(exp)) {
1347                         spin_unlock(&exp->exp_lock);
1348                         continue;
1349                 }
1350                 exp->exp_failed = 1;
1351                 spin_unlock(&exp->exp_lock);
1352
1353                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1354                 evicted++;
1355                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1356                        obd->obd_name, exp->exp_client_uuid.uuid,
1357                        exp->exp_connection == NULL ? "<unknown>" :
1358                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1359                 print_export_data(exp, "EVICTING", 0);
1360         }
1361         spin_unlock(&obd->obd_dev_lock);
1362
1363         if (evicted)
1364                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1365                               obd->obd_name, evicted);
1366
1367         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1368                                                  OBD_OPT_ABORT_RECOV);
1369         EXIT;
1370 }
1371 EXPORT_SYMBOL(class_disconnect_stale_exports);
1372
1373 void class_fail_export(struct obd_export *exp)
1374 {
1375         int rc, already_failed;
1376
1377         spin_lock(&exp->exp_lock);
1378         already_failed = exp->exp_failed;
1379         exp->exp_failed = 1;
1380         spin_unlock(&exp->exp_lock);
1381
1382         if (already_failed) {
1383                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1384                        exp, exp->exp_client_uuid.uuid);
1385                 return;
1386         }
1387
1388         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1389                exp, exp->exp_client_uuid.uuid);
1390
1391         if (obd_dump_on_timeout)
1392                 libcfs_debug_dumplog();
1393
1394         /* need for safe call CDEBUG after obd_disconnect */
1395         class_export_get(exp);
1396
1397         /* Most callers into obd_disconnect are removing their own reference
1398          * (request, for example) in addition to the one from the hash table.
1399          * We don't have such a reference here, so make one. */
1400         class_export_get(exp);
1401         rc = obd_disconnect(exp);
1402         if (rc)
1403                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1404         else
1405                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1406                        exp, exp->exp_client_uuid.uuid);
1407         class_export_put(exp);
1408 }
1409 EXPORT_SYMBOL(class_fail_export);
1410
1411 char *obd_export_nid2str(struct obd_export *exp)
1412 {
1413         if (exp->exp_connection != NULL)
1414                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1415
1416         return "(no nid)";
1417 }
1418 EXPORT_SYMBOL(obd_export_nid2str);
1419
1420 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1421 {
1422         cfs_hash_t *nid_hash;
1423         struct obd_export *doomed_exp = NULL;
1424         int exports_evicted = 0;
1425
1426         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1427
1428         spin_lock(&obd->obd_dev_lock);
1429         /* umount has run already, so evict thread should leave
1430          * its task to umount thread now */
1431         if (obd->obd_stopping) {
1432                 spin_unlock(&obd->obd_dev_lock);
1433                 return exports_evicted;
1434         }
1435         nid_hash = obd->obd_nid_hash;
1436         cfs_hash_getref(nid_hash);
1437         spin_unlock(&obd->obd_dev_lock);
1438
1439         do {
1440                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1441                 if (doomed_exp == NULL)
1442                         break;
1443
1444                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1445                          "nid %s found, wanted nid %s, requested nid %s\n",
1446                          obd_export_nid2str(doomed_exp),
1447                          libcfs_nid2str(nid_key), nid);
1448                 LASSERTF(doomed_exp != obd->obd_self_export,
1449                          "self-export is hashed by NID?\n");
1450                 exports_evicted++;
1451                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1452                               "request\n", obd->obd_name,
1453                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1454                               obd_export_nid2str(doomed_exp));
1455                 class_fail_export(doomed_exp);
1456                 class_export_put(doomed_exp);
1457         } while (1);
1458
1459         cfs_hash_putref(nid_hash);
1460
1461         if (!exports_evicted)
1462                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1463                        obd->obd_name, nid);
1464         return exports_evicted;
1465 }
1466 EXPORT_SYMBOL(obd_export_evict_by_nid);
1467
1468 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1469 {
1470         cfs_hash_t *uuid_hash;
1471         struct obd_export *doomed_exp = NULL;
1472         struct obd_uuid doomed_uuid;
1473         int exports_evicted = 0;
1474
1475         spin_lock(&obd->obd_dev_lock);
1476         if (obd->obd_stopping) {
1477                 spin_unlock(&obd->obd_dev_lock);
1478                 return exports_evicted;
1479         }
1480         uuid_hash = obd->obd_uuid_hash;
1481         cfs_hash_getref(uuid_hash);
1482         spin_unlock(&obd->obd_dev_lock);
1483
1484         obd_str2uuid(&doomed_uuid, uuid);
1485         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1486                 CERROR("%s: can't evict myself\n", obd->obd_name);
1487                 cfs_hash_putref(uuid_hash);
1488                 return exports_evicted;
1489         }
1490
1491         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1492
1493         if (doomed_exp == NULL) {
1494                 CERROR("%s: can't disconnect %s: no exports found\n",
1495                        obd->obd_name, uuid);
1496         } else {
1497                 CWARN("%s: evicting %s at adminstrative request\n",
1498                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1499                 class_fail_export(doomed_exp);
1500                 class_export_put(doomed_exp);
1501                 exports_evicted++;
1502         }
1503         cfs_hash_putref(uuid_hash);
1504
1505         return exports_evicted;
1506 }
1507 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1508
1509 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1510 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1511 EXPORT_SYMBOL(class_export_dump_hook);
1512 #endif
1513
1514 static void print_export_data(struct obd_export *exp, const char *status,
1515                               int locks)
1516 {
1517         struct ptlrpc_reply_state *rs;
1518         struct ptlrpc_reply_state *first_reply = NULL;
1519         int nreplies = 0;
1520
1521         spin_lock(&exp->exp_lock);
1522         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1523                                 rs_exp_list) {
1524                 if (nreplies == 0)
1525                         first_reply = rs;
1526                 nreplies++;
1527         }
1528         spin_unlock(&exp->exp_lock);
1529
1530         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1531                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1532                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1533                cfs_atomic_read(&exp->exp_rpc_count),
1534                cfs_atomic_read(&exp->exp_cb_count),
1535                cfs_atomic_read(&exp->exp_locks_count),
1536                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1537                nreplies, first_reply, nreplies > 3 ? "..." : "",
1538                exp->exp_last_committed);
1539 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1540         if (locks && class_export_dump_hook != NULL)
1541                 class_export_dump_hook(exp);
1542 #endif
1543 }
1544
1545 void dump_exports(struct obd_device *obd, int locks)
1546 {
1547         struct obd_export *exp;
1548
1549         spin_lock(&obd->obd_dev_lock);
1550         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1551                 print_export_data(exp, "ACTIVE", locks);
1552         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1553                 print_export_data(exp, "UNLINKED", locks);
1554         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1555                 print_export_data(exp, "DELAYED", locks);
1556         spin_unlock(&obd->obd_dev_lock);
1557         spin_lock(&obd_zombie_impexp_lock);
1558         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1559                 print_export_data(exp, "ZOMBIE", locks);
1560         spin_unlock(&obd_zombie_impexp_lock);
1561 }
1562 EXPORT_SYMBOL(dump_exports);
1563
1564 void obd_exports_barrier(struct obd_device *obd)
1565 {
1566         int waited = 2;
1567         LASSERT(cfs_list_empty(&obd->obd_exports));
1568         spin_lock(&obd->obd_dev_lock);
1569         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1570                 spin_unlock(&obd->obd_dev_lock);
1571                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1572                                                    cfs_time_seconds(waited));
1573                 if (waited > 5 && IS_PO2(waited)) {
1574                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1575                                       "more than %d seconds. "
1576                                       "The obd refcount = %d. Is it stuck?\n",
1577                                       obd->obd_name, waited,
1578                                       cfs_atomic_read(&obd->obd_refcount));
1579                         dump_exports(obd, 1);
1580                 }
1581                 waited *= 2;
1582                 spin_lock(&obd->obd_dev_lock);
1583         }
1584         spin_unlock(&obd->obd_dev_lock);
1585 }
1586 EXPORT_SYMBOL(obd_exports_barrier);
1587
1588 /* Total amount of zombies to be destroyed */
1589 static int zombies_count = 0;
1590
1591 /**
1592  * kill zombie imports and exports
1593  */
1594 void obd_zombie_impexp_cull(void)
1595 {
1596         struct obd_import *import;
1597         struct obd_export *export;
1598         ENTRY;
1599
1600         do {
1601                 spin_lock(&obd_zombie_impexp_lock);
1602
1603                 import = NULL;
1604                 if (!cfs_list_empty(&obd_zombie_imports)) {
1605                         import = cfs_list_entry(obd_zombie_imports.next,
1606                                                 struct obd_import,
1607                                                 imp_zombie_chain);
1608                         cfs_list_del_init(&import->imp_zombie_chain);
1609                 }
1610
1611                 export = NULL;
1612                 if (!cfs_list_empty(&obd_zombie_exports)) {
1613                         export = cfs_list_entry(obd_zombie_exports.next,
1614                                                 struct obd_export,
1615                                                 exp_obd_chain);
1616                         cfs_list_del_init(&export->exp_obd_chain);
1617                 }
1618
1619                 spin_unlock(&obd_zombie_impexp_lock);
1620
1621                 if (import != NULL) {
1622                         class_import_destroy(import);
1623                         spin_lock(&obd_zombie_impexp_lock);
1624                         zombies_count--;
1625                         spin_unlock(&obd_zombie_impexp_lock);
1626                 }
1627
1628                 if (export != NULL) {
1629                         class_export_destroy(export);
1630                         spin_lock(&obd_zombie_impexp_lock);
1631                         zombies_count--;
1632                         spin_unlock(&obd_zombie_impexp_lock);
1633                 }
1634
1635                 cfs_cond_resched();
1636         } while (import != NULL || export != NULL);
1637         EXIT;
1638 }
1639
1640 static struct completion        obd_zombie_start;
1641 static struct completion        obd_zombie_stop;
1642 static unsigned long            obd_zombie_flags;
1643 static cfs_waitq_t              obd_zombie_waitq;
1644 static pid_t                    obd_zombie_pid;
1645
1646 enum {
1647         OBD_ZOMBIE_STOP         = 0x0001,
1648 };
1649
1650 /**
1651  * check for work for kill zombie import/export thread.
1652  */
1653 static int obd_zombie_impexp_check(void *arg)
1654 {
1655         int rc;
1656
1657         spin_lock(&obd_zombie_impexp_lock);
1658         rc = (zombies_count == 0) &&
1659              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1660         spin_unlock(&obd_zombie_impexp_lock);
1661
1662         RETURN(rc);
1663 }
1664
1665 /**
1666  * Add export to the obd_zombe thread and notify it.
1667  */
1668 static void obd_zombie_export_add(struct obd_export *exp) {
1669         spin_lock(&exp->exp_obd->obd_dev_lock);
1670         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1671         cfs_list_del_init(&exp->exp_obd_chain);
1672         spin_unlock(&exp->exp_obd->obd_dev_lock);
1673         spin_lock(&obd_zombie_impexp_lock);
1674         zombies_count++;
1675         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1676         spin_unlock(&obd_zombie_impexp_lock);
1677
1678         obd_zombie_impexp_notify();
1679 }
1680
1681 /**
1682  * Add import to the obd_zombe thread and notify it.
1683  */
1684 static void obd_zombie_import_add(struct obd_import *imp) {
1685         LASSERT(imp->imp_sec == NULL);
1686         LASSERT(imp->imp_rq_pool == NULL);
1687         spin_lock(&obd_zombie_impexp_lock);
1688         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1689         zombies_count++;
1690         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1691         spin_unlock(&obd_zombie_impexp_lock);
1692
1693         obd_zombie_impexp_notify();
1694 }
1695
1696 /**
1697  * notify import/export destroy thread about new zombie.
1698  */
1699 static void obd_zombie_impexp_notify(void)
1700 {
1701         /*
1702          * Make sure obd_zomebie_impexp_thread get this notification.
1703          * It is possible this signal only get by obd_zombie_barrier, and
1704          * barrier gulps this notification and sleeps away and hangs ensues
1705          */
1706         cfs_waitq_broadcast(&obd_zombie_waitq);
1707 }
1708
1709 /**
1710  * check whether obd_zombie is idle
1711  */
1712 static int obd_zombie_is_idle(void)
1713 {
1714         int rc;
1715
1716         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1717         spin_lock(&obd_zombie_impexp_lock);
1718         rc = (zombies_count == 0);
1719         spin_unlock(&obd_zombie_impexp_lock);
1720         return rc;
1721 }
1722
1723 /**
1724  * wait when obd_zombie import/export queues become empty
1725  */
1726 void obd_zombie_barrier(void)
1727 {
1728         struct l_wait_info lwi = { 0 };
1729
1730         if (obd_zombie_pid == cfs_curproc_pid())
1731                 /* don't wait for myself */
1732                 return;
1733         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1734 }
1735 EXPORT_SYMBOL(obd_zombie_barrier);
1736
1737 #ifdef __KERNEL__
1738
1739 /**
1740  * destroy zombie export/import thread.
1741  */
1742 static int obd_zombie_impexp_thread(void *unused)
1743 {
1744         int rc;
1745
1746         rc = cfs_daemonize_ctxt("obd_zombid");
1747         if (rc != 0) {
1748                 complete(&obd_zombie_start);
1749                 RETURN(rc);
1750         }
1751
1752         complete(&obd_zombie_start);
1753
1754         obd_zombie_pid = cfs_curproc_pid();
1755
1756         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1757                 struct l_wait_info lwi = { 0 };
1758
1759                 l_wait_event(obd_zombie_waitq,
1760                              !obd_zombie_impexp_check(NULL), &lwi);
1761                 obd_zombie_impexp_cull();
1762
1763                 /*
1764                  * Notify obd_zombie_barrier callers that queues
1765                  * may be empty.
1766                  */
1767                 cfs_waitq_signal(&obd_zombie_waitq);
1768         }
1769
1770         complete(&obd_zombie_stop);
1771
1772         RETURN(0);
1773 }
1774
1775 #else /* ! KERNEL */
1776
1777 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1778 static void *obd_zombie_impexp_work_cb;
1779 static void *obd_zombie_impexp_idle_cb;
1780
1781 int obd_zombie_impexp_kill(void *arg)
1782 {
1783         int rc = 0;
1784
1785         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1786                 obd_zombie_impexp_cull();
1787                 rc = 1;
1788         }
1789         cfs_atomic_dec(&zombie_recur);
1790         return rc;
1791 }
1792
1793 #endif
1794
1795 /**
1796  * start destroy zombie import/export thread
1797  */
1798 int obd_zombie_impexp_init(void)
1799 {
1800         int rc;
1801
1802         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1803         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1804         spin_lock_init(&obd_zombie_impexp_lock);
1805         init_completion(&obd_zombie_start);
1806         init_completion(&obd_zombie_stop);
1807         cfs_waitq_init(&obd_zombie_waitq);
1808         obd_zombie_pid = 0;
1809
1810 #ifdef __KERNEL__
1811         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1812         if (rc < 0)
1813                 RETURN(rc);
1814
1815         wait_for_completion(&obd_zombie_start);
1816 #else
1817
1818         obd_zombie_impexp_work_cb =
1819                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1820                                                  &obd_zombie_impexp_kill, NULL);
1821
1822         obd_zombie_impexp_idle_cb =
1823                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1824                                                  &obd_zombie_impexp_check, NULL);
1825         rc = 0;
1826 #endif
1827         RETURN(rc);
1828 }
1829 /**
1830  * stop destroy zombie import/export thread
1831  */
1832 void obd_zombie_impexp_stop(void)
1833 {
1834         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1835         obd_zombie_impexp_notify();
1836 #ifdef __KERNEL__
1837         wait_for_completion(&obd_zombie_stop);
1838 #else
1839         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1840         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1841 #endif
1842 }
1843
1844 /***** Kernel-userspace comm helpers *******/
1845
1846 /* Get length of entire message, including header */
1847 int kuc_len(int payload_len)
1848 {
1849         return sizeof(struct kuc_hdr) + payload_len;
1850 }
1851 EXPORT_SYMBOL(kuc_len);
1852
1853 /* Get a pointer to kuc header, given a ptr to the payload
1854  * @param p Pointer to payload area
1855  * @returns Pointer to kuc header
1856  */
1857 struct kuc_hdr * kuc_ptr(void *p)
1858 {
1859         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1860         LASSERT(lh->kuc_magic == KUC_MAGIC);
1861         return lh;
1862 }
1863 EXPORT_SYMBOL(kuc_ptr);
1864
1865 /* Test if payload is part of kuc message
1866  * @param p Pointer to payload area
1867  * @returns boolean
1868  */
1869 int kuc_ispayload(void *p)
1870 {
1871         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1872
1873         if (kh->kuc_magic == KUC_MAGIC)
1874                 return 1;
1875         else
1876                 return 0;
1877 }
1878 EXPORT_SYMBOL(kuc_ispayload);
1879
1880 /* Alloc space for a message, and fill in header
1881  * @return Pointer to payload area
1882  */
1883 void *kuc_alloc(int payload_len, int transport, int type)
1884 {
1885         struct kuc_hdr *lh;
1886         int len = kuc_len(payload_len);
1887
1888         OBD_ALLOC(lh, len);
1889         if (lh == NULL)
1890                 return ERR_PTR(-ENOMEM);
1891
1892         lh->kuc_magic = KUC_MAGIC;
1893         lh->kuc_transport = transport;
1894         lh->kuc_msgtype = type;
1895         lh->kuc_msglen = len;
1896
1897         return (void *)(lh + 1);
1898 }
1899 EXPORT_SYMBOL(kuc_alloc);
1900
1901 /* Takes pointer to payload area */
1902 inline void kuc_free(void *p, int payload_len)
1903 {
1904         struct kuc_hdr *lh = kuc_ptr(p);
1905         OBD_FREE(lh, kuc_len(payload_len));
1906 }
1907 EXPORT_SYMBOL(kuc_free);
1908
1909
1910