Whamcloud - gitweb
d39caa9940a79f6298b7b60bc5604730dd018590
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
129                         modname = LUSTRE_MDT_NAME;
130
131                 if (!cfs_request_module("%s", modname)) {
132                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
133                         type = class_search_type(name);
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
136                                            modname);
137                 }
138         }
139 #endif
140         if (type) {
141                 spin_lock(&type->obd_type_lock);
142                 type->typ_refcnt++;
143                 cfs_try_module_get(type->typ_dt_ops->o_owner);
144                 spin_unlock(&type->obd_type_lock);
145         }
146         return type;
147 }
148 EXPORT_SYMBOL(class_get_type);
149
150 void class_put_type(struct obd_type *type)
151 {
152         LASSERT(type);
153         spin_lock(&type->obd_type_lock);
154         type->typ_refcnt--;
155         cfs_module_put(type->typ_dt_ops->o_owner);
156         spin_unlock(&type->obd_type_lock);
157 }
158 EXPORT_SYMBOL(class_put_type);
159
160 #define CLASS_MAX_NAME 1024
161
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163                         struct lprocfs_vars *vars, const char *name,
164                         struct lu_device_type *ldt)
165 {
166         struct obd_type *type;
167         int rc = 0;
168         ENTRY;
169
170         /* sanity check */
171         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172
173         if (class_search_type(name)) {
174                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175                 RETURN(-EEXIST);
176         }
177
178         rc = -ENOMEM;
179         OBD_ALLOC(type, sizeof(*type));
180         if (type == NULL)
181                 RETURN(rc);
182
183         OBD_ALLOC_PTR(type->typ_dt_ops);
184         OBD_ALLOC_PTR(type->typ_md_ops);
185         OBD_ALLOC(type->typ_name, strlen(name) + 1);
186
187         if (type->typ_dt_ops == NULL ||
188             type->typ_md_ops == NULL ||
189             type->typ_name == NULL)
190                 GOTO (failed, rc);
191
192         *(type->typ_dt_ops) = *dt_ops;
193         /* md_ops is optional */
194         if (md_ops)
195                 *(type->typ_md_ops) = *md_ops;
196         strcpy(type->typ_name, name);
197         spin_lock_init(&type->obd_type_lock);
198
199 #ifdef LPROCFS
200         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
201                                               vars, type);
202         if (IS_ERR(type->typ_procroot)) {
203                 rc = PTR_ERR(type->typ_procroot);
204                 type->typ_procroot = NULL;
205                 GOTO (failed, rc);
206         }
207 #endif
208         if (ldt != NULL) {
209                 type->typ_lu = ldt;
210                 rc = lu_device_type_init(ldt);
211                 if (rc != 0)
212                         GOTO (failed, rc);
213         }
214
215         spin_lock(&obd_types_lock);
216         cfs_list_add(&type->typ_chain, &obd_types);
217         spin_unlock(&obd_types_lock);
218
219         RETURN (0);
220
221  failed:
222         if (type->typ_name != NULL)
223                 OBD_FREE(type->typ_name, strlen(name) + 1);
224         if (type->typ_md_ops != NULL)
225                 OBD_FREE_PTR(type->typ_md_ops);
226         if (type->typ_dt_ops != NULL)
227                 OBD_FREE_PTR(type->typ_dt_ops);
228         OBD_FREE(type, sizeof(*type));
229         RETURN(rc);
230 }
231 EXPORT_SYMBOL(class_register_type);
232
233 int class_unregister_type(const char *name)
234 {
235         struct obd_type *type = class_search_type(name);
236         ENTRY;
237
238         if (!type) {
239                 CERROR("unknown obd type\n");
240                 RETURN(-EINVAL);
241         }
242
243         if (type->typ_refcnt) {
244                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
245                 /* This is a bad situation, let's make the best of it */
246                 /* Remove ops, but leave the name for debugging */
247                 OBD_FREE_PTR(type->typ_dt_ops);
248                 OBD_FREE_PTR(type->typ_md_ops);
249                 RETURN(-EBUSY);
250         }
251
252         /* we do not use type->typ_procroot as for compatibility purposes
253          * other modules can share names (i.e. lod can use lov entry). so
254          * we can't reference pointer as it can get invalided when another
255          * module removes the entry */
256         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
257
258         if (type->typ_lu)
259                 lu_device_type_fini(type->typ_lu);
260
261         spin_lock(&obd_types_lock);
262         cfs_list_del(&type->typ_chain);
263         spin_unlock(&obd_types_lock);
264         OBD_FREE(type->typ_name, strlen(name) + 1);
265         if (type->typ_dt_ops != NULL)
266                 OBD_FREE_PTR(type->typ_dt_ops);
267         if (type->typ_md_ops != NULL)
268                 OBD_FREE_PTR(type->typ_md_ops);
269         OBD_FREE(type, sizeof(*type));
270         RETURN(0);
271 } /* class_unregister_type */
272 EXPORT_SYMBOL(class_unregister_type);
273
274 /**
275  * Create a new obd device.
276  *
277  * Find an empty slot in ::obd_devs[], create a new obd device in it.
278  *
279  * \param[in] type_name obd device type string.
280  * \param[in] name      obd device name.
281  *
282  * \retval NULL if create fails, otherwise return the obd device
283  *         pointer created.
284  */
285 struct obd_device *class_newdev(const char *type_name, const char *name)
286 {
287         struct obd_device *result = NULL;
288         struct obd_device *newdev;
289         struct obd_type *type = NULL;
290         int i;
291         int new_obd_minor = 0;
292         ENTRY;
293
294         if (strlen(name) >= MAX_OBD_NAME) {
295                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
296                 RETURN(ERR_PTR(-EINVAL));
297         }
298
299         type = class_get_type(type_name);
300         if (type == NULL){
301                 CERROR("OBD: unknown type: %s\n", type_name);
302                 RETURN(ERR_PTR(-ENODEV));
303         }
304
305         newdev = obd_device_alloc();
306         if (newdev == NULL)
307                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
308
309         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
310
311         write_lock(&obd_dev_lock);
312         for (i = 0; i < class_devno_max(); i++) {
313                 struct obd_device *obd = class_num2obd(i);
314
315                 if (obd && obd->obd_name &&
316                     (strcmp(name, obd->obd_name) == 0)) {
317                         CERROR("Device %s already exists at %d, won't add\n",
318                                name, i);
319                         if (result) {
320                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
321                                          "%p obd_magic %08x != %08x\n", result,
322                                          result->obd_magic, OBD_DEVICE_MAGIC);
323                                 LASSERTF(result->obd_minor == new_obd_minor,
324                                          "%p obd_minor %d != %d\n", result,
325                                          result->obd_minor, new_obd_minor);
326
327                                 obd_devs[result->obd_minor] = NULL;
328                                 result->obd_name[0]='\0';
329                          }
330                         result = ERR_PTR(-EEXIST);
331                         break;
332                 }
333                 if (!result && !obd) {
334                         result = newdev;
335                         result->obd_minor = i;
336                         new_obd_minor = i;
337                         result->obd_type = type;
338                         strncpy(result->obd_name, name,
339                                 sizeof(result->obd_name) - 1);
340                         obd_devs[i] = result;
341                 }
342         }
343         write_unlock(&obd_dev_lock);
344
345         if (result == NULL && i >= class_devno_max()) {
346                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
347                        class_devno_max());
348                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
349         }
350
351         if (IS_ERR(result))
352                 GOTO(out, result);
353
354         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
355                result->obd_name, result);
356
357         RETURN(result);
358 out:
359         obd_device_free(newdev);
360 out_type:
361         class_put_type(type);
362         return result;
363 }
364
365 void class_release_dev(struct obd_device *obd)
366 {
367         struct obd_type *obd_type = obd->obd_type;
368
369         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
370                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
371         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
372                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
373         LASSERT(obd_type != NULL);
374
375         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
376                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
377
378         write_lock(&obd_dev_lock);
379         obd_devs[obd->obd_minor] = NULL;
380         write_unlock(&obd_dev_lock);
381         obd_device_free(obd);
382
383         class_put_type(obd_type);
384 }
385
386 int class_name2dev(const char *name)
387 {
388         int i;
389
390         if (!name)
391                 return -1;
392
393         read_lock(&obd_dev_lock);
394         for (i = 0; i < class_devno_max(); i++) {
395                 struct obd_device *obd = class_num2obd(i);
396
397                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
398                         /* Make sure we finished attaching before we give
399                            out any references */
400                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
401                         if (obd->obd_attached) {
402                                 read_unlock(&obd_dev_lock);
403                                 return i;
404                         }
405                         break;
406                 }
407         }
408         read_unlock(&obd_dev_lock);
409
410         return -1;
411 }
412 EXPORT_SYMBOL(class_name2dev);
413
414 struct obd_device *class_name2obd(const char *name)
415 {
416         int dev = class_name2dev(name);
417
418         if (dev < 0 || dev > class_devno_max())
419                 return NULL;
420         return class_num2obd(dev);
421 }
422 EXPORT_SYMBOL(class_name2obd);
423
424 int class_uuid2dev(struct obd_uuid *uuid)
425 {
426         int i;
427
428         read_lock(&obd_dev_lock);
429         for (i = 0; i < class_devno_max(); i++) {
430                 struct obd_device *obd = class_num2obd(i);
431
432                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
433                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
434                         read_unlock(&obd_dev_lock);
435                         return i;
436                 }
437         }
438         read_unlock(&obd_dev_lock);
439
440         return -1;
441 }
442 EXPORT_SYMBOL(class_uuid2dev);
443
444 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
445 {
446         int dev = class_uuid2dev(uuid);
447         if (dev < 0)
448                 return NULL;
449         return class_num2obd(dev);
450 }
451 EXPORT_SYMBOL(class_uuid2obd);
452
453 /**
454  * Get obd device from ::obd_devs[]
455  *
456  * \param num [in] array index
457  *
458  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
459  *         otherwise return the obd device there.
460  */
461 struct obd_device *class_num2obd(int num)
462 {
463         struct obd_device *obd = NULL;
464
465         if (num < class_devno_max()) {
466                 obd = obd_devs[num];
467                 if (obd == NULL)
468                         return NULL;
469
470                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
471                          "%p obd_magic %08x != %08x\n",
472                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
473                 LASSERTF(obd->obd_minor == num,
474                          "%p obd_minor %0d != %0d\n",
475                          obd, obd->obd_minor, num);
476         }
477
478         return obd;
479 }
480 EXPORT_SYMBOL(class_num2obd);
481
482 /**
483  * Get obd devices count. Device in any
484  *    state are counted
485  * \retval obd device count
486  */
487 int get_devices_count(void)
488 {
489         int index, max_index = class_devno_max(), dev_count = 0;
490
491         read_lock(&obd_dev_lock);
492         for (index = 0; index <= max_index; index++) {
493                 struct obd_device *obd = class_num2obd(index);
494                 if (obd != NULL)
495                         dev_count++;
496         }
497         read_unlock(&obd_dev_lock);
498
499         return dev_count;
500 }
501 EXPORT_SYMBOL(get_devices_count);
502
503 void class_obd_list(void)
504 {
505         char *status;
506         int i;
507
508         read_lock(&obd_dev_lock);
509         for (i = 0; i < class_devno_max(); i++) {
510                 struct obd_device *obd = class_num2obd(i);
511
512                 if (obd == NULL)
513                         continue;
514                 if (obd->obd_stopping)
515                         status = "ST";
516                 else if (obd->obd_set_up)
517                         status = "UP";
518                 else if (obd->obd_attached)
519                         status = "AT";
520                 else
521                         status = "--";
522                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
523                          i, status, obd->obd_type->typ_name,
524                          obd->obd_name, obd->obd_uuid.uuid,
525                          cfs_atomic_read(&obd->obd_refcount));
526         }
527         read_unlock(&obd_dev_lock);
528         return;
529 }
530
531 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
532    specified, then only the client with that uuid is returned,
533    otherwise any client connected to the tgt is returned. */
534 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
535                                           const char * typ_name,
536                                           struct obd_uuid *grp_uuid)
537 {
538         int i;
539
540         read_lock(&obd_dev_lock);
541         for (i = 0; i < class_devno_max(); i++) {
542                 struct obd_device *obd = class_num2obd(i);
543
544                 if (obd == NULL)
545                         continue;
546                 if ((strncmp(obd->obd_type->typ_name, typ_name,
547                              strlen(typ_name)) == 0)) {
548                         if (obd_uuid_equals(tgt_uuid,
549                                             &obd->u.cli.cl_target_uuid) &&
550                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
551                                                          &obd->obd_uuid) : 1)) {
552                                 read_unlock(&obd_dev_lock);
553                                 return obd;
554                         }
555                 }
556         }
557         read_unlock(&obd_dev_lock);
558
559         return NULL;
560 }
561 EXPORT_SYMBOL(class_find_client_obd);
562
563 /* Iterate the obd_device list looking devices have grp_uuid. Start
564    searching at *next, and if a device is found, the next index to look
565    at is saved in *next. If next is NULL, then the first matching device
566    will always be returned. */
567 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
568 {
569         int i;
570
571         if (next == NULL)
572                 i = 0;
573         else if (*next >= 0 && *next < class_devno_max())
574                 i = *next;
575         else
576                 return NULL;
577
578         read_lock(&obd_dev_lock);
579         for (; i < class_devno_max(); i++) {
580                 struct obd_device *obd = class_num2obd(i);
581
582                 if (obd == NULL)
583                         continue;
584                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
585                         if (next != NULL)
586                                 *next = i+1;
587                         read_unlock(&obd_dev_lock);
588                         return obd;
589                 }
590         }
591         read_unlock(&obd_dev_lock);
592
593         return NULL;
594 }
595 EXPORT_SYMBOL(class_devices_in_group);
596
597 /**
598  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
599  * adjust sptlrpc settings accordingly.
600  */
601 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
602 {
603         struct obd_device  *obd;
604         const char         *type;
605         int                 i, rc = 0, rc2;
606
607         LASSERT(namelen > 0);
608
609         read_lock(&obd_dev_lock);
610         for (i = 0; i < class_devno_max(); i++) {
611                 obd = class_num2obd(i);
612
613                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
614                         continue;
615
616                 /* only notify mdc, osc, mdt, ost */
617                 type = obd->obd_type->typ_name;
618                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
619                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
620                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
621                     strcmp(type, LUSTRE_OST_NAME) != 0)
622                         continue;
623
624                 if (strncmp(obd->obd_name, fsname, namelen))
625                         continue;
626
627                 class_incref(obd, __FUNCTION__, obd);
628                 read_unlock(&obd_dev_lock);
629                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
630                                          sizeof(KEY_SPTLRPC_CONF),
631                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
632                 rc = rc ? rc : rc2;
633                 class_decref(obd, __FUNCTION__, obd);
634                 read_lock(&obd_dev_lock);
635         }
636         read_unlock(&obd_dev_lock);
637         return rc;
638 }
639 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
640
641 void obd_cleanup_caches(void)
642 {
643         int rc;
644
645         ENTRY;
646         if (obd_device_cachep) {
647                 rc = cfs_mem_cache_destroy(obd_device_cachep);
648                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
649                 obd_device_cachep = NULL;
650         }
651         if (obdo_cachep) {
652                 rc = cfs_mem_cache_destroy(obdo_cachep);
653                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
654                 obdo_cachep = NULL;
655         }
656         if (import_cachep) {
657                 rc = cfs_mem_cache_destroy(import_cachep);
658                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
659                 import_cachep = NULL;
660         }
661         if (capa_cachep) {
662                 rc = cfs_mem_cache_destroy(capa_cachep);
663                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
664                 capa_cachep = NULL;
665         }
666         EXIT;
667 }
668
669 int obd_init_caches(void)
670 {
671         ENTRY;
672
673         LASSERT(obd_device_cachep == NULL);
674         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
675                                                  sizeof(struct obd_device),
676                                                  0, 0);
677         if (!obd_device_cachep)
678                 GOTO(out, -ENOMEM);
679
680         LASSERT(obdo_cachep == NULL);
681         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
682                                            0, 0);
683         if (!obdo_cachep)
684                 GOTO(out, -ENOMEM);
685
686         LASSERT(import_cachep == NULL);
687         import_cachep = cfs_mem_cache_create("ll_import_cache",
688                                              sizeof(struct obd_import),
689                                              0, 0);
690         if (!import_cachep)
691                 GOTO(out, -ENOMEM);
692
693         LASSERT(capa_cachep == NULL);
694         capa_cachep = cfs_mem_cache_create("capa_cache",
695                                            sizeof(struct obd_capa), 0, 0);
696         if (!capa_cachep)
697                 GOTO(out, -ENOMEM);
698
699         RETURN(0);
700  out:
701         obd_cleanup_caches();
702         RETURN(-ENOMEM);
703
704 }
705
706 /* map connection to client */
707 struct obd_export *class_conn2export(struct lustre_handle *conn)
708 {
709         struct obd_export *export;
710         ENTRY;
711
712         if (!conn) {
713                 CDEBUG(D_CACHE, "looking for null handle\n");
714                 RETURN(NULL);
715         }
716
717         if (conn->cookie == -1) {  /* this means assign a new connection */
718                 CDEBUG(D_CACHE, "want a new connection\n");
719                 RETURN(NULL);
720         }
721
722         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
723         export = class_handle2object(conn->cookie);
724         RETURN(export);
725 }
726 EXPORT_SYMBOL(class_conn2export);
727
728 struct obd_device *class_exp2obd(struct obd_export *exp)
729 {
730         if (exp)
731                 return exp->exp_obd;
732         return NULL;
733 }
734 EXPORT_SYMBOL(class_exp2obd);
735
736 struct obd_device *class_conn2obd(struct lustre_handle *conn)
737 {
738         struct obd_export *export;
739         export = class_conn2export(conn);
740         if (export) {
741                 struct obd_device *obd = export->exp_obd;
742                 class_export_put(export);
743                 return obd;
744         }
745         return NULL;
746 }
747 EXPORT_SYMBOL(class_conn2obd);
748
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
750 {
751         struct obd_device *obd = exp->exp_obd;
752         if (obd == NULL)
753                 return NULL;
754         return obd->u.cli.cl_import;
755 }
756 EXPORT_SYMBOL(class_exp2cliimp);
757
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
759 {
760         struct obd_device *obd = class_conn2obd(conn);
761         if (obd == NULL)
762                 return NULL;
763         return obd->u.cli.cl_import;
764 }
765 EXPORT_SYMBOL(class_conn2cliimp);
766
767 /* Export management functions */
768 static void class_export_destroy(struct obd_export *exp)
769 {
770         struct obd_device *obd = exp->exp_obd;
771         ENTRY;
772
773         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774         LASSERT(obd != NULL);
775
776         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777                exp->exp_client_uuid.uuid, obd->obd_name);
778
779         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
780         if (exp->exp_connection)
781                 ptlrpc_put_connection_superhack(exp->exp_connection);
782
783         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
784         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
785         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
786         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
787         obd_destroy_export(exp);
788         class_decref(obd, "export", exp);
789
790         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
791         EXIT;
792 }
793
794 static void export_handle_addref(void *export)
795 {
796         class_export_get(export);
797 }
798
799 static struct portals_handle_ops export_handle_ops = {
800         .hop_addref = export_handle_addref,
801         .hop_free   = NULL,
802 };
803
804 struct obd_export *class_export_get(struct obd_export *exp)
805 {
806         cfs_atomic_inc(&exp->exp_refcount);
807         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
808                cfs_atomic_read(&exp->exp_refcount));
809         return exp;
810 }
811 EXPORT_SYMBOL(class_export_get);
812
813 void class_export_put(struct obd_export *exp)
814 {
815         LASSERT(exp != NULL);
816         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
817         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
818                cfs_atomic_read(&exp->exp_refcount) - 1);
819
820         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
821                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
822                 CDEBUG(D_IOCTL, "final put %p/%s\n",
823                        exp, exp->exp_client_uuid.uuid);
824
825                 /* release nid stat refererence */
826                 lprocfs_exp_cleanup(exp);
827
828                 obd_zombie_export_add(exp);
829         }
830 }
831 EXPORT_SYMBOL(class_export_put);
832
833 /* Creates a new export, adds it to the hash table, and returns a
834  * pointer to it. The refcount is 2: one for the hash reference, and
835  * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837                                     struct obd_uuid *cluuid)
838 {
839         struct obd_export *export;
840         cfs_hash_t *hash = NULL;
841         int rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(export);
845         if (!export)
846                 return ERR_PTR(-ENOMEM);
847
848         export->exp_conn_cnt = 0;
849         export->exp_lock_hash = NULL;
850         export->exp_flock_hash = NULL;
851         cfs_atomic_set(&export->exp_refcount, 2);
852         cfs_atomic_set(&export->exp_rpc_count, 0);
853         cfs_atomic_set(&export->exp_cb_count, 0);
854         cfs_atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
857         spin_lock_init(&export->exp_locks_list_guard);
858 #endif
859         cfs_atomic_set(&export->exp_replay_count, 0);
860         export->exp_obd = obd;
861         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
862         spin_lock_init(&export->exp_uncommitted_replies_lock);
863         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
865         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
866         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
867         class_handle_hash(&export->exp_handle, &export_handle_ops);
868         export->exp_last_request_time = cfs_time_current_sec();
869         spin_lock_init(&export->exp_lock);
870         spin_lock_init(&export->exp_rpc_lock);
871         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
872         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
873         spin_lock_init(&export->exp_bl_list_lock);
874         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
875
876         export->exp_sp_peer = LUSTRE_SP_ANY;
877         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
878         export->exp_client_uuid = *cluuid;
879         obd_init_export(export);
880
881         spin_lock(&obd->obd_dev_lock);
882         /* shouldn't happen, but might race */
883         if (obd->obd_stopping)
884                 GOTO(exit_unlock, rc = -ENODEV);
885
886         hash = cfs_hash_getref(obd->obd_uuid_hash);
887         if (hash == NULL)
888                 GOTO(exit_unlock, rc = -ENODEV);
889         spin_unlock(&obd->obd_dev_lock);
890
891         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
892                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
893                 if (rc != 0) {
894                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
895                                       obd->obd_name, cluuid->uuid, rc);
896                         GOTO(exit_err, rc = -EALREADY);
897                 }
898         }
899
900         spin_lock(&obd->obd_dev_lock);
901         if (obd->obd_stopping) {
902                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
903                 GOTO(exit_unlock, rc = -ENODEV);
904         }
905
906         class_incref(obd, "export", export);
907         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
908         cfs_list_add_tail(&export->exp_obd_chain_timed,
909                           &export->exp_obd->obd_exports_timed);
910         export->exp_obd->obd_num_exports++;
911         spin_unlock(&obd->obd_dev_lock);
912         cfs_hash_putref(hash);
913         RETURN(export);
914
915 exit_unlock:
916         spin_unlock(&obd->obd_dev_lock);
917 exit_err:
918         if (hash)
919                 cfs_hash_putref(hash);
920         class_handle_unhash(&export->exp_handle);
921         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
922         obd_destroy_export(export);
923         OBD_FREE_PTR(export);
924         return ERR_PTR(rc);
925 }
926 EXPORT_SYMBOL(class_new_export);
927
928 void class_unlink_export(struct obd_export *exp)
929 {
930         class_handle_unhash(&exp->exp_handle);
931
932         spin_lock(&exp->exp_obd->obd_dev_lock);
933         /* delete an uuid-export hashitem from hashtables */
934         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
935                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
936                              &exp->exp_client_uuid,
937                              &exp->exp_uuid_hash);
938
939         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
940         cfs_list_del_init(&exp->exp_obd_chain_timed);
941         exp->exp_obd->obd_num_exports--;
942         spin_unlock(&exp->exp_obd->obd_dev_lock);
943         class_export_put(exp);
944 }
945 EXPORT_SYMBOL(class_unlink_export);
946
947 /* Import management functions */
948 void class_import_destroy(struct obd_import *imp)
949 {
950         ENTRY;
951
952         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
953                 imp->imp_obd->obd_name);
954
955         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
956
957         ptlrpc_put_connection_superhack(imp->imp_connection);
958
959         while (!cfs_list_empty(&imp->imp_conn_list)) {
960                 struct obd_import_conn *imp_conn;
961
962                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
963                                           struct obd_import_conn, oic_item);
964                 cfs_list_del_init(&imp_conn->oic_item);
965                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
966                 OBD_FREE(imp_conn, sizeof(*imp_conn));
967         }
968
969         LASSERT(imp->imp_sec == NULL);
970         class_decref(imp->imp_obd, "import", imp);
971         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
972         EXIT;
973 }
974
975 static void import_handle_addref(void *import)
976 {
977         class_import_get(import);
978 }
979
980 static struct portals_handle_ops import_handle_ops = {
981         .hop_addref = import_handle_addref,
982         .hop_free   = NULL,
983 };
984
985 struct obd_import *class_import_get(struct obd_import *import)
986 {
987         cfs_atomic_inc(&import->imp_refcount);
988         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
989                cfs_atomic_read(&import->imp_refcount),
990                import->imp_obd->obd_name);
991         return import;
992 }
993 EXPORT_SYMBOL(class_import_get);
994
995 void class_import_put(struct obd_import *imp)
996 {
997         ENTRY;
998
999         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1000         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1001
1002         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1003                cfs_atomic_read(&imp->imp_refcount) - 1,
1004                imp->imp_obd->obd_name);
1005
1006         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1007                 CDEBUG(D_INFO, "final put import %p\n", imp);
1008                 obd_zombie_import_add(imp);
1009         }
1010
1011         /* catch possible import put race */
1012         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1013         EXIT;
1014 }
1015 EXPORT_SYMBOL(class_import_put);
1016
1017 static void init_imp_at(struct imp_at *at) {
1018         int i;
1019         at_init(&at->iat_net_latency, 0, 0);
1020         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1021                 /* max service estimates are tracked on the server side, so
1022                    don't use the AT history here, just use the last reported
1023                    val. (But keep hist for proc histogram, worst_ever) */
1024                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1025                         AT_FLG_NOHIST);
1026         }
1027 }
1028
1029 struct obd_import *class_new_import(struct obd_device *obd)
1030 {
1031         struct obd_import *imp;
1032
1033         OBD_ALLOC(imp, sizeof(*imp));
1034         if (imp == NULL)
1035                 return NULL;
1036
1037         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1038         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1039         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1040         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1041         spin_lock_init(&imp->imp_lock);
1042         imp->imp_last_success_conn = 0;
1043         imp->imp_state = LUSTRE_IMP_NEW;
1044         imp->imp_obd = class_incref(obd, "import", imp);
1045         mutex_init(&imp->imp_sec_mutex);
1046         cfs_waitq_init(&imp->imp_recovery_waitq);
1047
1048         cfs_atomic_set(&imp->imp_refcount, 2);
1049         cfs_atomic_set(&imp->imp_unregistering, 0);
1050         cfs_atomic_set(&imp->imp_inflight, 0);
1051         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1052         cfs_atomic_set(&imp->imp_inval_count, 0);
1053         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1054         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1055         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1056         init_imp_at(&imp->imp_at);
1057
1058         /* the default magic is V2, will be used in connect RPC, and
1059          * then adjusted according to the flags in request/reply. */
1060         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1061
1062         return imp;
1063 }
1064 EXPORT_SYMBOL(class_new_import);
1065
1066 void class_destroy_import(struct obd_import *import)
1067 {
1068         LASSERT(import != NULL);
1069         LASSERT(import != LP_POISON);
1070
1071         class_handle_unhash(&import->imp_handle);
1072
1073         spin_lock(&import->imp_lock);
1074         import->imp_generation++;
1075         spin_unlock(&import->imp_lock);
1076         class_import_put(import);
1077 }
1078 EXPORT_SYMBOL(class_destroy_import);
1079
1080 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1081
1082 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1083 {
1084         spin_lock(&exp->exp_locks_list_guard);
1085
1086         LASSERT(lock->l_exp_refs_nr >= 0);
1087
1088         if (lock->l_exp_refs_target != NULL &&
1089             lock->l_exp_refs_target != exp) {
1090                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1091                               exp, lock, lock->l_exp_refs_target);
1092         }
1093         if ((lock->l_exp_refs_nr ++) == 0) {
1094                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1095                 lock->l_exp_refs_target = exp;
1096         }
1097         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098                lock, exp, lock->l_exp_refs_nr);
1099         spin_unlock(&exp->exp_locks_list_guard);
1100 }
1101 EXPORT_SYMBOL(__class_export_add_lock_ref);
1102
1103 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1104 {
1105         spin_lock(&exp->exp_locks_list_guard);
1106         LASSERT(lock->l_exp_refs_nr > 0);
1107         if (lock->l_exp_refs_target != exp) {
1108                 LCONSOLE_WARN("lock %p, "
1109                               "mismatching export pointers: %p, %p\n",
1110                               lock, lock->l_exp_refs_target, exp);
1111         }
1112         if (-- lock->l_exp_refs_nr == 0) {
1113                 cfs_list_del_init(&lock->l_exp_refs_link);
1114                 lock->l_exp_refs_target = NULL;
1115         }
1116         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117                lock, exp, lock->l_exp_refs_nr);
1118         spin_unlock(&exp->exp_locks_list_guard);
1119 }
1120 EXPORT_SYMBOL(__class_export_del_lock_ref);
1121 #endif
1122
1123 /* A connection defines an export context in which preallocation can
1124    be managed. This releases the export pointer reference, and returns
1125    the export handle, so the export refcount is 1 when this function
1126    returns. */
1127 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1128                   struct obd_uuid *cluuid)
1129 {
1130         struct obd_export *export;
1131         LASSERT(conn != NULL);
1132         LASSERT(obd != NULL);
1133         LASSERT(cluuid != NULL);
1134         ENTRY;
1135
1136         export = class_new_export(obd, cluuid);
1137         if (IS_ERR(export))
1138                 RETURN(PTR_ERR(export));
1139
1140         conn->cookie = export->exp_handle.h_cookie;
1141         class_export_put(export);
1142
1143         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1144                cluuid->uuid, conn->cookie);
1145         RETURN(0);
1146 }
1147 EXPORT_SYMBOL(class_connect);
1148
1149 /* if export is involved in recovery then clean up related things */
1150 void class_export_recovery_cleanup(struct obd_export *exp)
1151 {
1152         struct obd_device *obd = exp->exp_obd;
1153
1154         spin_lock(&obd->obd_recovery_task_lock);
1155         if (exp->exp_delayed)
1156                 obd->obd_delayed_clients--;
1157         if (obd->obd_recovering) {
1158                 if (exp->exp_in_recovery) {
1159                         spin_lock(&exp->exp_lock);
1160                         exp->exp_in_recovery = 0;
1161                         spin_unlock(&exp->exp_lock);
1162                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1163                         cfs_atomic_dec(&obd->obd_connected_clients);
1164                 }
1165
1166                 /* if called during recovery then should update
1167                  * obd_stale_clients counter,
1168                  * lightweight exports are not counted */
1169                 if (exp->exp_failed &&
1170                     (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) == 0)
1171                         exp->exp_obd->obd_stale_clients++;
1172         }
1173         spin_unlock(&obd->obd_recovery_task_lock);
1174         /** Cleanup req replay fields */
1175         if (exp->exp_req_replay_needed) {
1176                 spin_lock(&exp->exp_lock);
1177                 exp->exp_req_replay_needed = 0;
1178                 spin_unlock(&exp->exp_lock);
1179                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1180                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1181         }
1182         /** Cleanup lock replay data */
1183         if (exp->exp_lock_replay_needed) {
1184                 spin_lock(&exp->exp_lock);
1185                 exp->exp_lock_replay_needed = 0;
1186                 spin_unlock(&exp->exp_lock);
1187                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1188                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1189         }
1190 }
1191
1192 /* This function removes 1-3 references from the export:
1193  * 1 - for export pointer passed
1194  * and if disconnect really need
1195  * 2 - removing from hash
1196  * 3 - in client_unlink_export
1197  * The export pointer passed to this function can destroyed */
1198 int class_disconnect(struct obd_export *export)
1199 {
1200         int already_disconnected;
1201         ENTRY;
1202
1203         if (export == NULL) {
1204                 CWARN("attempting to free NULL export %p\n", export);
1205                 RETURN(-EINVAL);
1206         }
1207
1208         spin_lock(&export->exp_lock);
1209         already_disconnected = export->exp_disconnected;
1210         export->exp_disconnected = 1;
1211         spin_unlock(&export->exp_lock);
1212
1213         /* class_cleanup(), abort_recovery(), and class_fail_export()
1214          * all end up in here, and if any of them race we shouldn't
1215          * call extra class_export_puts(). */
1216         if (already_disconnected) {
1217                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1218                 GOTO(no_disconn, already_disconnected);
1219         }
1220
1221         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1222                export->exp_handle.h_cookie);
1223
1224         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1225                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1226                              &export->exp_connection->c_peer.nid,
1227                              &export->exp_nid_hash);
1228
1229         class_export_recovery_cleanup(export);
1230         class_unlink_export(export);
1231 no_disconn:
1232         class_export_put(export);
1233         RETURN(0);
1234 }
1235 EXPORT_SYMBOL(class_disconnect);
1236
1237 /* Return non-zero for a fully connected export */
1238 int class_connected_export(struct obd_export *exp)
1239 {
1240         if (exp) {
1241                 int connected;
1242                 spin_lock(&exp->exp_lock);
1243                 connected = (exp->exp_conn_cnt > 0);
1244                 spin_unlock(&exp->exp_lock);
1245                 return connected;
1246         }
1247         return 0;
1248 }
1249 EXPORT_SYMBOL(class_connected_export);
1250
1251 static void class_disconnect_export_list(cfs_list_t *list,
1252                                          enum obd_option flags)
1253 {
1254         int rc;
1255         struct obd_export *exp;
1256         ENTRY;
1257
1258         /* It's possible that an export may disconnect itself, but
1259          * nothing else will be added to this list. */
1260         while (!cfs_list_empty(list)) {
1261                 exp = cfs_list_entry(list->next, struct obd_export,
1262                                      exp_obd_chain);
1263                 /* need for safe call CDEBUG after obd_disconnect */
1264                 class_export_get(exp);
1265
1266                 spin_lock(&exp->exp_lock);
1267                 exp->exp_flags = flags;
1268                 spin_unlock(&exp->exp_lock);
1269
1270                 if (obd_uuid_equals(&exp->exp_client_uuid,
1271                                     &exp->exp_obd->obd_uuid)) {
1272                         CDEBUG(D_HA,
1273                                "exp %p export uuid == obd uuid, don't discon\n",
1274                                exp);
1275                         /* Need to delete this now so we don't end up pointing
1276                          * to work_list later when this export is cleaned up. */
1277                         cfs_list_del_init(&exp->exp_obd_chain);
1278                         class_export_put(exp);
1279                         continue;
1280                 }
1281
1282                 class_export_get(exp);
1283                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1284                        "last request at "CFS_TIME_T"\n",
1285                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1286                        exp, exp->exp_last_request_time);
1287                 /* release one export reference anyway */
1288                 rc = obd_disconnect(exp);
1289
1290                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1291                        obd_export_nid2str(exp), exp, rc);
1292                 class_export_put(exp);
1293         }
1294         EXIT;
1295 }
1296
1297 void class_disconnect_exports(struct obd_device *obd)
1298 {
1299         cfs_list_t work_list;
1300         ENTRY;
1301
1302         /* Move all of the exports from obd_exports to a work list, en masse. */
1303         CFS_INIT_LIST_HEAD(&work_list);
1304         spin_lock(&obd->obd_dev_lock);
1305         cfs_list_splice_init(&obd->obd_exports, &work_list);
1306         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1307         spin_unlock(&obd->obd_dev_lock);
1308
1309         if (!cfs_list_empty(&work_list)) {
1310                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1311                        "disconnecting them\n", obd->obd_minor, obd);
1312                 class_disconnect_export_list(&work_list,
1313                                              exp_flags_from_obd(obd));
1314         } else
1315                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1316                        obd->obd_minor, obd);
1317         EXIT;
1318 }
1319 EXPORT_SYMBOL(class_disconnect_exports);
1320
1321 /* Remove exports that have not completed recovery.
1322  */
1323 void class_disconnect_stale_exports(struct obd_device *obd,
1324                                     int (*test_export)(struct obd_export *))
1325 {
1326         cfs_list_t work_list;
1327         struct obd_export *exp, *n;
1328         int evicted = 0;
1329         ENTRY;
1330
1331         CFS_INIT_LIST_HEAD(&work_list);
1332         spin_lock(&obd->obd_dev_lock);
1333         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1334                                      exp_obd_chain) {
1335                 /* don't count self-export as client */
1336                 if (obd_uuid_equals(&exp->exp_client_uuid,
1337                                     &exp->exp_obd->obd_uuid))
1338                         continue;
1339
1340                 /* don't evict clients which have no slot in last_rcvd
1341                  * (e.g. lightweight connection) */
1342                 if (exp->exp_target_data.ted_lr_idx == -1)
1343                         continue;
1344
1345                 spin_lock(&exp->exp_lock);
1346                 if (exp->exp_failed || test_export(exp)) {
1347                         spin_unlock(&exp->exp_lock);
1348                         continue;
1349                 }
1350                 exp->exp_failed = 1;
1351                 spin_unlock(&exp->exp_lock);
1352
1353                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1354                 evicted++;
1355                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1356                        obd->obd_name, exp->exp_client_uuid.uuid,
1357                        exp->exp_connection == NULL ? "<unknown>" :
1358                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1359                 print_export_data(exp, "EVICTING", 0);
1360         }
1361         spin_unlock(&obd->obd_dev_lock);
1362
1363         if (evicted)
1364                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1365                               obd->obd_name, evicted);
1366
1367         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1368                                                  OBD_OPT_ABORT_RECOV);
1369         EXIT;
1370 }
1371 EXPORT_SYMBOL(class_disconnect_stale_exports);
1372
1373 void class_fail_export(struct obd_export *exp)
1374 {
1375         int rc, already_failed;
1376
1377         spin_lock(&exp->exp_lock);
1378         already_failed = exp->exp_failed;
1379         exp->exp_failed = 1;
1380         spin_unlock(&exp->exp_lock);
1381
1382         if (already_failed) {
1383                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1384                        exp, exp->exp_client_uuid.uuid);
1385                 return;
1386         }
1387
1388         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1389                exp, exp->exp_client_uuid.uuid);
1390
1391         if (obd_dump_on_timeout)
1392                 libcfs_debug_dumplog();
1393
1394         /* need for safe call CDEBUG after obd_disconnect */
1395         class_export_get(exp);
1396
1397         /* Most callers into obd_disconnect are removing their own reference
1398          * (request, for example) in addition to the one from the hash table.
1399          * We don't have such a reference here, so make one. */
1400         class_export_get(exp);
1401         rc = obd_disconnect(exp);
1402         if (rc)
1403                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1404         else
1405                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1406                        exp, exp->exp_client_uuid.uuid);
1407         class_export_put(exp);
1408 }
1409 EXPORT_SYMBOL(class_fail_export);
1410
1411 char *obd_export_nid2str(struct obd_export *exp)
1412 {
1413         if (exp->exp_connection != NULL)
1414                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1415
1416         return "(no nid)";
1417 }
1418 EXPORT_SYMBOL(obd_export_nid2str);
1419
1420 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1421 {
1422         cfs_hash_t *nid_hash;
1423         struct obd_export *doomed_exp = NULL;
1424         int exports_evicted = 0;
1425
1426         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1427
1428         spin_lock(&obd->obd_dev_lock);
1429         /* umount has run already, so evict thread should leave
1430          * its task to umount thread now */
1431         if (obd->obd_stopping) {
1432                 spin_unlock(&obd->obd_dev_lock);
1433                 return exports_evicted;
1434         }
1435         nid_hash = obd->obd_nid_hash;
1436         cfs_hash_getref(nid_hash);
1437         spin_unlock(&obd->obd_dev_lock);
1438
1439         do {
1440                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1441                 if (doomed_exp == NULL)
1442                         break;
1443
1444                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1445                          "nid %s found, wanted nid %s, requested nid %s\n",
1446                          obd_export_nid2str(doomed_exp),
1447                          libcfs_nid2str(nid_key), nid);
1448                 LASSERTF(doomed_exp != obd->obd_self_export,
1449                          "self-export is hashed by NID?\n");
1450                 exports_evicted++;
1451                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1452                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1453                        exports_evicted);
1454                 class_fail_export(doomed_exp);
1455                 class_export_put(doomed_exp);
1456         } while (1);
1457
1458         cfs_hash_putref(nid_hash);
1459
1460         if (!exports_evicted)
1461                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1462                        obd->obd_name, nid);
1463         return exports_evicted;
1464 }
1465 EXPORT_SYMBOL(obd_export_evict_by_nid);
1466
1467 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1468 {
1469         cfs_hash_t *uuid_hash;
1470         struct obd_export *doomed_exp = NULL;
1471         struct obd_uuid doomed_uuid;
1472         int exports_evicted = 0;
1473
1474         spin_lock(&obd->obd_dev_lock);
1475         if (obd->obd_stopping) {
1476                 spin_unlock(&obd->obd_dev_lock);
1477                 return exports_evicted;
1478         }
1479         uuid_hash = obd->obd_uuid_hash;
1480         cfs_hash_getref(uuid_hash);
1481         spin_unlock(&obd->obd_dev_lock);
1482
1483         obd_str2uuid(&doomed_uuid, uuid);
1484         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1485                 CERROR("%s: can't evict myself\n", obd->obd_name);
1486                 cfs_hash_putref(uuid_hash);
1487                 return exports_evicted;
1488         }
1489
1490         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1491
1492         if (doomed_exp == NULL) {
1493                 CERROR("%s: can't disconnect %s: no exports found\n",
1494                        obd->obd_name, uuid);
1495         } else {
1496                 CWARN("%s: evicting %s at adminstrative request\n",
1497                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1498                 class_fail_export(doomed_exp);
1499                 class_export_put(doomed_exp);
1500                 exports_evicted++;
1501         }
1502         cfs_hash_putref(uuid_hash);
1503
1504         return exports_evicted;
1505 }
1506 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1507
1508 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1509 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1510 EXPORT_SYMBOL(class_export_dump_hook);
1511 #endif
1512
1513 static void print_export_data(struct obd_export *exp, const char *status,
1514                               int locks)
1515 {
1516         struct ptlrpc_reply_state *rs;
1517         struct ptlrpc_reply_state *first_reply = NULL;
1518         int nreplies = 0;
1519
1520         spin_lock(&exp->exp_lock);
1521         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1522                                 rs_exp_list) {
1523                 if (nreplies == 0)
1524                         first_reply = rs;
1525                 nreplies++;
1526         }
1527         spin_unlock(&exp->exp_lock);
1528
1529         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1530                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1531                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1532                cfs_atomic_read(&exp->exp_rpc_count),
1533                cfs_atomic_read(&exp->exp_cb_count),
1534                cfs_atomic_read(&exp->exp_locks_count),
1535                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1536                nreplies, first_reply, nreplies > 3 ? "..." : "",
1537                exp->exp_last_committed);
1538 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1539         if (locks && class_export_dump_hook != NULL)
1540                 class_export_dump_hook(exp);
1541 #endif
1542 }
1543
1544 void dump_exports(struct obd_device *obd, int locks)
1545 {
1546         struct obd_export *exp;
1547
1548         spin_lock(&obd->obd_dev_lock);
1549         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1550                 print_export_data(exp, "ACTIVE", locks);
1551         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1552                 print_export_data(exp, "UNLINKED", locks);
1553         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1554                 print_export_data(exp, "DELAYED", locks);
1555         spin_unlock(&obd->obd_dev_lock);
1556         spin_lock(&obd_zombie_impexp_lock);
1557         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1558                 print_export_data(exp, "ZOMBIE", locks);
1559         spin_unlock(&obd_zombie_impexp_lock);
1560 }
1561 EXPORT_SYMBOL(dump_exports);
1562
1563 void obd_exports_barrier(struct obd_device *obd)
1564 {
1565         int waited = 2;
1566         LASSERT(cfs_list_empty(&obd->obd_exports));
1567         spin_lock(&obd->obd_dev_lock);
1568         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1569                 spin_unlock(&obd->obd_dev_lock);
1570                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1571                                                    cfs_time_seconds(waited));
1572                 if (waited > 5 && IS_PO2(waited)) {
1573                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1574                                       "more than %d seconds. "
1575                                       "The obd refcount = %d. Is it stuck?\n",
1576                                       obd->obd_name, waited,
1577                                       cfs_atomic_read(&obd->obd_refcount));
1578                         dump_exports(obd, 1);
1579                 }
1580                 waited *= 2;
1581                 spin_lock(&obd->obd_dev_lock);
1582         }
1583         spin_unlock(&obd->obd_dev_lock);
1584 }
1585 EXPORT_SYMBOL(obd_exports_barrier);
1586
1587 /* Total amount of zombies to be destroyed */
1588 static int zombies_count = 0;
1589
1590 /**
1591  * kill zombie imports and exports
1592  */
1593 void obd_zombie_impexp_cull(void)
1594 {
1595         struct obd_import *import;
1596         struct obd_export *export;
1597         ENTRY;
1598
1599         do {
1600                 spin_lock(&obd_zombie_impexp_lock);
1601
1602                 import = NULL;
1603                 if (!cfs_list_empty(&obd_zombie_imports)) {
1604                         import = cfs_list_entry(obd_zombie_imports.next,
1605                                                 struct obd_import,
1606                                                 imp_zombie_chain);
1607                         cfs_list_del_init(&import->imp_zombie_chain);
1608                 }
1609
1610                 export = NULL;
1611                 if (!cfs_list_empty(&obd_zombie_exports)) {
1612                         export = cfs_list_entry(obd_zombie_exports.next,
1613                                                 struct obd_export,
1614                                                 exp_obd_chain);
1615                         cfs_list_del_init(&export->exp_obd_chain);
1616                 }
1617
1618                 spin_unlock(&obd_zombie_impexp_lock);
1619
1620                 if (import != NULL) {
1621                         class_import_destroy(import);
1622                         spin_lock(&obd_zombie_impexp_lock);
1623                         zombies_count--;
1624                         spin_unlock(&obd_zombie_impexp_lock);
1625                 }
1626
1627                 if (export != NULL) {
1628                         class_export_destroy(export);
1629                         spin_lock(&obd_zombie_impexp_lock);
1630                         zombies_count--;
1631                         spin_unlock(&obd_zombie_impexp_lock);
1632                 }
1633
1634                 cfs_cond_resched();
1635         } while (import != NULL || export != NULL);
1636         EXIT;
1637 }
1638
1639 static struct completion        obd_zombie_start;
1640 static struct completion        obd_zombie_stop;
1641 static unsigned long            obd_zombie_flags;
1642 static cfs_waitq_t              obd_zombie_waitq;
1643 static pid_t                    obd_zombie_pid;
1644
1645 enum {
1646         OBD_ZOMBIE_STOP         = 0x0001,
1647 };
1648
1649 /**
1650  * check for work for kill zombie import/export thread.
1651  */
1652 static int obd_zombie_impexp_check(void *arg)
1653 {
1654         int rc;
1655
1656         spin_lock(&obd_zombie_impexp_lock);
1657         rc = (zombies_count == 0) &&
1658              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1659         spin_unlock(&obd_zombie_impexp_lock);
1660
1661         RETURN(rc);
1662 }
1663
1664 /**
1665  * Add export to the obd_zombe thread and notify it.
1666  */
1667 static void obd_zombie_export_add(struct obd_export *exp) {
1668         spin_lock(&exp->exp_obd->obd_dev_lock);
1669         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1670         cfs_list_del_init(&exp->exp_obd_chain);
1671         spin_unlock(&exp->exp_obd->obd_dev_lock);
1672         spin_lock(&obd_zombie_impexp_lock);
1673         zombies_count++;
1674         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1675         spin_unlock(&obd_zombie_impexp_lock);
1676
1677         obd_zombie_impexp_notify();
1678 }
1679
1680 /**
1681  * Add import to the obd_zombe thread and notify it.
1682  */
1683 static void obd_zombie_import_add(struct obd_import *imp) {
1684         LASSERT(imp->imp_sec == NULL);
1685         LASSERT(imp->imp_rq_pool == NULL);
1686         spin_lock(&obd_zombie_impexp_lock);
1687         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1688         zombies_count++;
1689         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1690         spin_unlock(&obd_zombie_impexp_lock);
1691
1692         obd_zombie_impexp_notify();
1693 }
1694
1695 /**
1696  * notify import/export destroy thread about new zombie.
1697  */
1698 static void obd_zombie_impexp_notify(void)
1699 {
1700         /*
1701          * Make sure obd_zomebie_impexp_thread get this notification.
1702          * It is possible this signal only get by obd_zombie_barrier, and
1703          * barrier gulps this notification and sleeps away and hangs ensues
1704          */
1705         cfs_waitq_broadcast(&obd_zombie_waitq);
1706 }
1707
1708 /**
1709  * check whether obd_zombie is idle
1710  */
1711 static int obd_zombie_is_idle(void)
1712 {
1713         int rc;
1714
1715         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1716         spin_lock(&obd_zombie_impexp_lock);
1717         rc = (zombies_count == 0);
1718         spin_unlock(&obd_zombie_impexp_lock);
1719         return rc;
1720 }
1721
1722 /**
1723  * wait when obd_zombie import/export queues become empty
1724  */
1725 void obd_zombie_barrier(void)
1726 {
1727         struct l_wait_info lwi = { 0 };
1728
1729         if (obd_zombie_pid == cfs_curproc_pid())
1730                 /* don't wait for myself */
1731                 return;
1732         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1733 }
1734 EXPORT_SYMBOL(obd_zombie_barrier);
1735
1736 #ifdef __KERNEL__
1737
1738 /**
1739  * destroy zombie export/import thread.
1740  */
1741 static int obd_zombie_impexp_thread(void *unused)
1742 {
1743         int rc;
1744
1745         rc = cfs_daemonize_ctxt("obd_zombid");
1746         if (rc != 0) {
1747                 complete(&obd_zombie_start);
1748                 RETURN(rc);
1749         }
1750
1751         complete(&obd_zombie_start);
1752
1753         obd_zombie_pid = cfs_curproc_pid();
1754
1755         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1756                 struct l_wait_info lwi = { 0 };
1757
1758                 l_wait_event(obd_zombie_waitq,
1759                              !obd_zombie_impexp_check(NULL), &lwi);
1760                 obd_zombie_impexp_cull();
1761
1762                 /*
1763                  * Notify obd_zombie_barrier callers that queues
1764                  * may be empty.
1765                  */
1766                 cfs_waitq_signal(&obd_zombie_waitq);
1767         }
1768
1769         complete(&obd_zombie_stop);
1770
1771         RETURN(0);
1772 }
1773
1774 #else /* ! KERNEL */
1775
1776 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1777 static void *obd_zombie_impexp_work_cb;
1778 static void *obd_zombie_impexp_idle_cb;
1779
1780 int obd_zombie_impexp_kill(void *arg)
1781 {
1782         int rc = 0;
1783
1784         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1785                 obd_zombie_impexp_cull();
1786                 rc = 1;
1787         }
1788         cfs_atomic_dec(&zombie_recur);
1789         return rc;
1790 }
1791
1792 #endif
1793
1794 /**
1795  * start destroy zombie import/export thread
1796  */
1797 int obd_zombie_impexp_init(void)
1798 {
1799         int rc;
1800
1801         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1802         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1803         spin_lock_init(&obd_zombie_impexp_lock);
1804         init_completion(&obd_zombie_start);
1805         init_completion(&obd_zombie_stop);
1806         cfs_waitq_init(&obd_zombie_waitq);
1807         obd_zombie_pid = 0;
1808
1809 #ifdef __KERNEL__
1810         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1811         if (rc < 0)
1812                 RETURN(rc);
1813
1814         wait_for_completion(&obd_zombie_start);
1815 #else
1816
1817         obd_zombie_impexp_work_cb =
1818                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1819                                                  &obd_zombie_impexp_kill, NULL);
1820
1821         obd_zombie_impexp_idle_cb =
1822                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1823                                                  &obd_zombie_impexp_check, NULL);
1824         rc = 0;
1825 #endif
1826         RETURN(rc);
1827 }
1828 /**
1829  * stop destroy zombie import/export thread
1830  */
1831 void obd_zombie_impexp_stop(void)
1832 {
1833         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1834         obd_zombie_impexp_notify();
1835 #ifdef __KERNEL__
1836         wait_for_completion(&obd_zombie_stop);
1837 #else
1838         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1839         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1840 #endif
1841 }
1842
1843 /***** Kernel-userspace comm helpers *******/
1844
1845 /* Get length of entire message, including header */
1846 int kuc_len(int payload_len)
1847 {
1848         return sizeof(struct kuc_hdr) + payload_len;
1849 }
1850 EXPORT_SYMBOL(kuc_len);
1851
1852 /* Get a pointer to kuc header, given a ptr to the payload
1853  * @param p Pointer to payload area
1854  * @returns Pointer to kuc header
1855  */
1856 struct kuc_hdr * kuc_ptr(void *p)
1857 {
1858         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1859         LASSERT(lh->kuc_magic == KUC_MAGIC);
1860         return lh;
1861 }
1862 EXPORT_SYMBOL(kuc_ptr);
1863
1864 /* Test if payload is part of kuc message
1865  * @param p Pointer to payload area
1866  * @returns boolean
1867  */
1868 int kuc_ispayload(void *p)
1869 {
1870         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1871
1872         if (kh->kuc_magic == KUC_MAGIC)
1873                 return 1;
1874         else
1875                 return 0;
1876 }
1877 EXPORT_SYMBOL(kuc_ispayload);
1878
1879 /* Alloc space for a message, and fill in header
1880  * @return Pointer to payload area
1881  */
1882 void *kuc_alloc(int payload_len, int transport, int type)
1883 {
1884         struct kuc_hdr *lh;
1885         int len = kuc_len(payload_len);
1886
1887         OBD_ALLOC(lh, len);
1888         if (lh == NULL)
1889                 return ERR_PTR(-ENOMEM);
1890
1891         lh->kuc_magic = KUC_MAGIC;
1892         lh->kuc_transport = transport;
1893         lh->kuc_msgtype = type;
1894         lh->kuc_msglen = len;
1895
1896         return (void *)(lh + 1);
1897 }
1898 EXPORT_SYMBOL(kuc_alloc);
1899
1900 /* Takes pointer to payload area */
1901 inline void kuc_free(void *p, int payload_len)
1902 {
1903         struct kuc_hdr *lh = kuc_ptr(p);
1904         OBD_FREE(lh, kuc_len(payload_len));
1905 }
1906 EXPORT_SYMBOL(kuc_free);
1907
1908
1909