Whamcloud - gitweb
LU-2785 osc: remove unused obd methods
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
48
49 extern cfs_list_t obd_types;
50 spinlock_t obd_types_lock;
51
52 struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 struct kmem_cache *import_cachep;
56
57 cfs_list_t      obd_zombie_imports;
58 cfs_list_t      obd_zombie_exports;
59 spinlock_t  obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         cfs_list_t *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         cfs_list_for_each(tmp, &obd_types) {
105                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114 EXPORT_SYMBOL(class_search_type);
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123
124                 if (strcmp(modname, "obdfilter") == 0)
125                         modname = "ofd";
126
127                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128                         modname = LUSTRE_OSP_NAME;
129
130                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131                         modname = LUSTRE_MDT_NAME;
132
133                 if (!request_module("%s", modname)) {
134                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135                         type = class_search_type(name);
136                 } else {
137                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138                                            modname);
139                 }
140         }
141 #endif
142         if (type) {
143                 spin_lock(&type->obd_type_lock);
144                 type->typ_refcnt++;
145                 try_module_get(type->typ_dt_ops->o_owner);
146                 spin_unlock(&type->obd_type_lock);
147         }
148         return type;
149 }
150 EXPORT_SYMBOL(class_get_type);
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160 EXPORT_SYMBOL(class_put_type);
161
162 #define CLASS_MAX_NAME 1024
163
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165                         struct lprocfs_seq_vars *module_vars,
166 #ifndef HAVE_ONLY_PROCFS_SEQ
167                         struct lprocfs_vars *vars,
168 #endif
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef LPROCFS
205 #ifndef HAVE_ONLY_PROCFS_SEQ
206         if (vars) {
207                 type->typ_procroot = lprocfs_register(type->typ_name,
208                                                         proc_lustre_root,
209                                                         vars, type);
210         } else
211 #endif
212         {
213                 type->typ_procroot = lprocfs_seq_register(type->typ_name,
214                                                         proc_lustre_root,
215                                                         module_vars, type);
216         }
217         if (IS_ERR(type->typ_procroot)) {
218                 rc = PTR_ERR(type->typ_procroot);
219                 type->typ_procroot = NULL;
220                 GOTO (failed, rc);
221         }
222 #endif
223         if (ldt != NULL) {
224                 type->typ_lu = ldt;
225                 rc = lu_device_type_init(ldt);
226                 if (rc != 0)
227                         GOTO (failed, rc);
228         }
229
230         spin_lock(&obd_types_lock);
231         cfs_list_add(&type->typ_chain, &obd_types);
232         spin_unlock(&obd_types_lock);
233
234         RETURN (0);
235
236  failed:
237         if (type->typ_name != NULL)
238                 OBD_FREE(type->typ_name, strlen(name) + 1);
239         if (type->typ_md_ops != NULL)
240                 OBD_FREE_PTR(type->typ_md_ops);
241         if (type->typ_dt_ops != NULL)
242                 OBD_FREE_PTR(type->typ_dt_ops);
243 #ifdef LPROCFS
244 #ifndef HAVE_ONLY_PROCFS_SEQ
245         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
246 #else
247         remove_proc_subtree(type->typ_name, proc_lustre_root);
248 #endif
249 #endif
250         OBD_FREE(type, sizeof(*type));
251         RETURN(rc);
252 }
253 EXPORT_SYMBOL(class_register_type);
254
255 int class_unregister_type(const char *name)
256 {
257         struct obd_type *type = class_search_type(name);
258         ENTRY;
259
260         if (!type) {
261                 CERROR("unknown obd type\n");
262                 RETURN(-EINVAL);
263         }
264
265         if (type->typ_refcnt) {
266                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
267                 /* This is a bad situation, let's make the best of it */
268                 /* Remove ops, but leave the name for debugging */
269                 OBD_FREE_PTR(type->typ_dt_ops);
270                 OBD_FREE_PTR(type->typ_md_ops);
271                 RETURN(-EBUSY);
272         }
273
274         /* we do not use type->typ_procroot as for compatibility purposes
275          * other modules can share names (i.e. lod can use lov entry). so
276          * we can't reference pointer as it can get invalided when another
277          * module removes the entry */
278 #ifdef LPROCFS
279 #ifndef HAVE_ONLY_PROCFS_SEQ
280         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
281 #else
282         remove_proc_subtree(type->typ_name, proc_lustre_root);
283 #endif
284 #endif
285         if (type->typ_lu)
286                 lu_device_type_fini(type->typ_lu);
287
288         spin_lock(&obd_types_lock);
289         cfs_list_del(&type->typ_chain);
290         spin_unlock(&obd_types_lock);
291         OBD_FREE(type->typ_name, strlen(name) + 1);
292         if (type->typ_dt_ops != NULL)
293                 OBD_FREE_PTR(type->typ_dt_ops);
294         if (type->typ_md_ops != NULL)
295                 OBD_FREE_PTR(type->typ_md_ops);
296         OBD_FREE(type, sizeof(*type));
297         RETURN(0);
298 } /* class_unregister_type */
299 EXPORT_SYMBOL(class_unregister_type);
300
301 /**
302  * Create a new obd device.
303  *
304  * Find an empty slot in ::obd_devs[], create a new obd device in it.
305  *
306  * \param[in] type_name obd device type string.
307  * \param[in] name      obd device name.
308  *
309  * \retval NULL if create fails, otherwise return the obd device
310  *         pointer created.
311  */
312 struct obd_device *class_newdev(const char *type_name, const char *name)
313 {
314         struct obd_device *result = NULL;
315         struct obd_device *newdev;
316         struct obd_type *type = NULL;
317         int i;
318         int new_obd_minor = 0;
319         ENTRY;
320
321         if (strlen(name) >= MAX_OBD_NAME) {
322                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
323                 RETURN(ERR_PTR(-EINVAL));
324         }
325
326         type = class_get_type(type_name);
327         if (type == NULL){
328                 CERROR("OBD: unknown type: %s\n", type_name);
329                 RETURN(ERR_PTR(-ENODEV));
330         }
331
332         newdev = obd_device_alloc();
333         if (newdev == NULL)
334                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
335
336         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
337
338         write_lock(&obd_dev_lock);
339         for (i = 0; i < class_devno_max(); i++) {
340                 struct obd_device *obd = class_num2obd(i);
341
342                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
343                         CERROR("Device %s already exists at %d, won't add\n",
344                                name, i);
345                         if (result) {
346                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
347                                          "%p obd_magic %08x != %08x\n", result,
348                                          result->obd_magic, OBD_DEVICE_MAGIC);
349                                 LASSERTF(result->obd_minor == new_obd_minor,
350                                          "%p obd_minor %d != %d\n", result,
351                                          result->obd_minor, new_obd_minor);
352
353                                 obd_devs[result->obd_minor] = NULL;
354                                 result->obd_name[0]='\0';
355                          }
356                         result = ERR_PTR(-EEXIST);
357                         break;
358                 }
359                 if (!result && !obd) {
360                         result = newdev;
361                         result->obd_minor = i;
362                         new_obd_minor = i;
363                         result->obd_type = type;
364                         strncpy(result->obd_name, name,
365                                 sizeof(result->obd_name) - 1);
366                         obd_devs[i] = result;
367                 }
368         }
369         write_unlock(&obd_dev_lock);
370
371         if (result == NULL && i >= class_devno_max()) {
372                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
373                        class_devno_max());
374                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
375         }
376
377         if (IS_ERR(result))
378                 GOTO(out, result);
379
380         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
381                result->obd_name, result);
382
383         RETURN(result);
384 out:
385         obd_device_free(newdev);
386 out_type:
387         class_put_type(type);
388         return result;
389 }
390
391 void class_release_dev(struct obd_device *obd)
392 {
393         struct obd_type *obd_type = obd->obd_type;
394
395         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
396                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
397         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
398                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
399         LASSERT(obd_type != NULL);
400
401         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
402                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
403
404         write_lock(&obd_dev_lock);
405         obd_devs[obd->obd_minor] = NULL;
406         write_unlock(&obd_dev_lock);
407         obd_device_free(obd);
408
409         class_put_type(obd_type);
410 }
411
412 int class_name2dev(const char *name)
413 {
414         int i;
415
416         if (!name)
417                 return -1;
418
419         read_lock(&obd_dev_lock);
420         for (i = 0; i < class_devno_max(); i++) {
421                 struct obd_device *obd = class_num2obd(i);
422
423                 if (obd && strcmp(name, obd->obd_name) == 0) {
424                         /* Make sure we finished attaching before we give
425                            out any references */
426                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
427                         if (obd->obd_attached) {
428                                 read_unlock(&obd_dev_lock);
429                                 return i;
430                         }
431                         break;
432                 }
433         }
434         read_unlock(&obd_dev_lock);
435
436         return -1;
437 }
438 EXPORT_SYMBOL(class_name2dev);
439
440 struct obd_device *class_name2obd(const char *name)
441 {
442         int dev = class_name2dev(name);
443
444         if (dev < 0 || dev > class_devno_max())
445                 return NULL;
446         return class_num2obd(dev);
447 }
448 EXPORT_SYMBOL(class_name2obd);
449
450 int class_uuid2dev(struct obd_uuid *uuid)
451 {
452         int i;
453
454         read_lock(&obd_dev_lock);
455         for (i = 0; i < class_devno_max(); i++) {
456                 struct obd_device *obd = class_num2obd(i);
457
458                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
459                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
460                         read_unlock(&obd_dev_lock);
461                         return i;
462                 }
463         }
464         read_unlock(&obd_dev_lock);
465
466         return -1;
467 }
468 EXPORT_SYMBOL(class_uuid2dev);
469
470 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
471 {
472         int dev = class_uuid2dev(uuid);
473         if (dev < 0)
474                 return NULL;
475         return class_num2obd(dev);
476 }
477 EXPORT_SYMBOL(class_uuid2obd);
478
479 /**
480  * Get obd device from ::obd_devs[]
481  *
482  * \param num [in] array index
483  *
484  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
485  *         otherwise return the obd device there.
486  */
487 struct obd_device *class_num2obd(int num)
488 {
489         struct obd_device *obd = NULL;
490
491         if (num < class_devno_max()) {
492                 obd = obd_devs[num];
493                 if (obd == NULL)
494                         return NULL;
495
496                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
497                          "%p obd_magic %08x != %08x\n",
498                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
499                 LASSERTF(obd->obd_minor == num,
500                          "%p obd_minor %0d != %0d\n",
501                          obd, obd->obd_minor, num);
502         }
503
504         return obd;
505 }
506 EXPORT_SYMBOL(class_num2obd);
507
508 /**
509  * Get obd devices count. Device in any
510  *    state are counted
511  * \retval obd device count
512  */
513 int get_devices_count(void)
514 {
515         int index, max_index = class_devno_max(), dev_count = 0;
516
517         read_lock(&obd_dev_lock);
518         for (index = 0; index <= max_index; index++) {
519                 struct obd_device *obd = class_num2obd(index);
520                 if (obd != NULL)
521                         dev_count++;
522         }
523         read_unlock(&obd_dev_lock);
524
525         return dev_count;
526 }
527 EXPORT_SYMBOL(get_devices_count);
528
529 void class_obd_list(void)
530 {
531         char *status;
532         int i;
533
534         read_lock(&obd_dev_lock);
535         for (i = 0; i < class_devno_max(); i++) {
536                 struct obd_device *obd = class_num2obd(i);
537
538                 if (obd == NULL)
539                         continue;
540                 if (obd->obd_stopping)
541                         status = "ST";
542                 else if (obd->obd_set_up)
543                         status = "UP";
544                 else if (obd->obd_attached)
545                         status = "AT";
546                 else
547                         status = "--";
548                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
549                          i, status, obd->obd_type->typ_name,
550                          obd->obd_name, obd->obd_uuid.uuid,
551                          atomic_read(&obd->obd_refcount));
552         }
553         read_unlock(&obd_dev_lock);
554         return;
555 }
556
557 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
558    specified, then only the client with that uuid is returned,
559    otherwise any client connected to the tgt is returned. */
560 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
561                                           const char * typ_name,
562                                           struct obd_uuid *grp_uuid)
563 {
564         int i;
565
566         read_lock(&obd_dev_lock);
567         for (i = 0; i < class_devno_max(); i++) {
568                 struct obd_device *obd = class_num2obd(i);
569
570                 if (obd == NULL)
571                         continue;
572                 if ((strncmp(obd->obd_type->typ_name, typ_name,
573                              strlen(typ_name)) == 0)) {
574                         if (obd_uuid_equals(tgt_uuid,
575                                             &obd->u.cli.cl_target_uuid) &&
576                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
577                                                          &obd->obd_uuid) : 1)) {
578                                 read_unlock(&obd_dev_lock);
579                                 return obd;
580                         }
581                 }
582         }
583         read_unlock(&obd_dev_lock);
584
585         return NULL;
586 }
587 EXPORT_SYMBOL(class_find_client_obd);
588
589 /* Iterate the obd_device list looking devices have grp_uuid. Start
590    searching at *next, and if a device is found, the next index to look
591    at is saved in *next. If next is NULL, then the first matching device
592    will always be returned. */
593 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
594 {
595         int i;
596
597         if (next == NULL)
598                 i = 0;
599         else if (*next >= 0 && *next < class_devno_max())
600                 i = *next;
601         else
602                 return NULL;
603
604         read_lock(&obd_dev_lock);
605         for (; i < class_devno_max(); i++) {
606                 struct obd_device *obd = class_num2obd(i);
607
608                 if (obd == NULL)
609                         continue;
610                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
611                         if (next != NULL)
612                                 *next = i+1;
613                         read_unlock(&obd_dev_lock);
614                         return obd;
615                 }
616         }
617         read_unlock(&obd_dev_lock);
618
619         return NULL;
620 }
621 EXPORT_SYMBOL(class_devices_in_group);
622
623 /**
624  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
625  * adjust sptlrpc settings accordingly.
626  */
627 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
628 {
629         struct obd_device  *obd;
630         const char         *type;
631         int                 i, rc = 0, rc2;
632
633         LASSERT(namelen > 0);
634
635         read_lock(&obd_dev_lock);
636         for (i = 0; i < class_devno_max(); i++) {
637                 obd = class_num2obd(i);
638
639                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
640                         continue;
641
642                 /* only notify mdc, osc, mdt, ost */
643                 type = obd->obd_type->typ_name;
644                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
645                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
646                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
647                     strcmp(type, LUSTRE_OST_NAME) != 0)
648                         continue;
649
650                 if (strncmp(obd->obd_name, fsname, namelen))
651                         continue;
652
653                 class_incref(obd, __FUNCTION__, obd);
654                 read_unlock(&obd_dev_lock);
655                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
656                                          sizeof(KEY_SPTLRPC_CONF),
657                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
658                 rc = rc ? rc : rc2;
659                 class_decref(obd, __FUNCTION__, obd);
660                 read_lock(&obd_dev_lock);
661         }
662         read_unlock(&obd_dev_lock);
663         return rc;
664 }
665 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
666
667 void obd_cleanup_caches(void)
668 {
669         ENTRY;
670         if (obd_device_cachep) {
671                 kmem_cache_destroy(obd_device_cachep);
672                 obd_device_cachep = NULL;
673         }
674         if (obdo_cachep) {
675                 kmem_cache_destroy(obdo_cachep);
676                 obdo_cachep = NULL;
677         }
678         if (import_cachep) {
679                 kmem_cache_destroy(import_cachep);
680                 import_cachep = NULL;
681         }
682         if (capa_cachep) {
683                 kmem_cache_destroy(capa_cachep);
684                 capa_cachep = NULL;
685         }
686         EXIT;
687 }
688
689 int obd_init_caches(void)
690 {
691         int rc;
692         ENTRY;
693
694         LASSERT(obd_device_cachep == NULL);
695         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
696                                               sizeof(struct obd_device),
697                                               0, 0, NULL);
698         if (!obd_device_cachep)
699                 GOTO(out, rc = -ENOMEM);
700
701         LASSERT(obdo_cachep == NULL);
702         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
703                                         0, 0, NULL);
704         if (!obdo_cachep)
705                 GOTO(out, rc = -ENOMEM);
706
707         LASSERT(import_cachep == NULL);
708         import_cachep = kmem_cache_create("ll_import_cache",
709                                           sizeof(struct obd_import),
710                                           0, 0, NULL);
711         if (!import_cachep)
712                 GOTO(out, rc = -ENOMEM);
713
714         LASSERT(capa_cachep == NULL);
715         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
716                                         0, 0, NULL);
717         if (!capa_cachep)
718                 GOTO(out, rc = -ENOMEM);
719
720         RETURN(0);
721 out:
722         obd_cleanup_caches();
723         RETURN(rc);
724 }
725
726 /* map connection to client */
727 struct obd_export *class_conn2export(struct lustre_handle *conn)
728 {
729         struct obd_export *export;
730         ENTRY;
731
732         if (!conn) {
733                 CDEBUG(D_CACHE, "looking for null handle\n");
734                 RETURN(NULL);
735         }
736
737         if (conn->cookie == -1) {  /* this means assign a new connection */
738                 CDEBUG(D_CACHE, "want a new connection\n");
739                 RETURN(NULL);
740         }
741
742         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
743         export = class_handle2object(conn->cookie, NULL);
744         RETURN(export);
745 }
746 EXPORT_SYMBOL(class_conn2export);
747
748 struct obd_device *class_exp2obd(struct obd_export *exp)
749 {
750         if (exp)
751                 return exp->exp_obd;
752         return NULL;
753 }
754 EXPORT_SYMBOL(class_exp2obd);
755
756 struct obd_device *class_conn2obd(struct lustre_handle *conn)
757 {
758         struct obd_export *export;
759         export = class_conn2export(conn);
760         if (export) {
761                 struct obd_device *obd = export->exp_obd;
762                 class_export_put(export);
763                 return obd;
764         }
765         return NULL;
766 }
767 EXPORT_SYMBOL(class_conn2obd);
768
769 struct obd_import *class_exp2cliimp(struct obd_export *exp)
770 {
771         struct obd_device *obd = exp->exp_obd;
772         if (obd == NULL)
773                 return NULL;
774         return obd->u.cli.cl_import;
775 }
776 EXPORT_SYMBOL(class_exp2cliimp);
777
778 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
779 {
780         struct obd_device *obd = class_conn2obd(conn);
781         if (obd == NULL)
782                 return NULL;
783         return obd->u.cli.cl_import;
784 }
785 EXPORT_SYMBOL(class_conn2cliimp);
786
787 /* Export management functions */
788 static void class_export_destroy(struct obd_export *exp)
789 {
790         struct obd_device *obd = exp->exp_obd;
791         ENTRY;
792
793         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
794         LASSERT(obd != NULL);
795
796         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
797                exp->exp_client_uuid.uuid, obd->obd_name);
798
799         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
800         if (exp->exp_connection)
801                 ptlrpc_put_connection_superhack(exp->exp_connection);
802
803         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
804         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
805         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
806         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
807         obd_destroy_export(exp);
808         class_decref(obd, "export", exp);
809
810         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
811         EXIT;
812 }
813
814 static void export_handle_addref(void *export)
815 {
816         class_export_get(export);
817 }
818
819 static struct portals_handle_ops export_handle_ops = {
820         .hop_addref = export_handle_addref,
821         .hop_free   = NULL,
822 };
823
824 struct obd_export *class_export_get(struct obd_export *exp)
825 {
826         atomic_inc(&exp->exp_refcount);
827         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
828                atomic_read(&exp->exp_refcount));
829         return exp;
830 }
831 EXPORT_SYMBOL(class_export_get);
832
833 void class_export_put(struct obd_export *exp)
834 {
835         LASSERT(exp != NULL);
836         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
837         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
838                atomic_read(&exp->exp_refcount) - 1);
839
840         if (atomic_dec_and_test(&exp->exp_refcount)) {
841                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
842                 CDEBUG(D_IOCTL, "final put %p/%s\n",
843                        exp, exp->exp_client_uuid.uuid);
844
845                 /* release nid stat refererence */
846                 lprocfs_exp_cleanup(exp);
847
848                 obd_zombie_export_add(exp);
849         }
850 }
851 EXPORT_SYMBOL(class_export_put);
852
853 /* Creates a new export, adds it to the hash table, and returns a
854  * pointer to it. The refcount is 2: one for the hash reference, and
855  * one for the pointer returned by this function. */
856 struct obd_export *class_new_export(struct obd_device *obd,
857                                     struct obd_uuid *cluuid)
858 {
859         struct obd_export *export;
860         cfs_hash_t *hash = NULL;
861         int rc = 0;
862         ENTRY;
863
864         OBD_ALLOC_PTR(export);
865         if (!export)
866                 return ERR_PTR(-ENOMEM);
867
868         export->exp_conn_cnt = 0;
869         export->exp_lock_hash = NULL;
870         export->exp_flock_hash = NULL;
871         atomic_set(&export->exp_refcount, 2);
872         atomic_set(&export->exp_rpc_count, 0);
873         atomic_set(&export->exp_cb_count, 0);
874         atomic_set(&export->exp_locks_count, 0);
875 #if LUSTRE_TRACKS_LOCK_EXP_REFS
876         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
877         spin_lock_init(&export->exp_locks_list_guard);
878 #endif
879         atomic_set(&export->exp_replay_count, 0);
880         export->exp_obd = obd;
881         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
882         spin_lock_init(&export->exp_uncommitted_replies_lock);
883         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
884         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
885         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
886         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
887         CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
888         class_handle_hash(&export->exp_handle, &export_handle_ops);
889         export->exp_last_request_time = cfs_time_current_sec();
890         spin_lock_init(&export->exp_lock);
891         spin_lock_init(&export->exp_rpc_lock);
892         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
893         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
894         spin_lock_init(&export->exp_bl_list_lock);
895         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
896
897         export->exp_sp_peer = LUSTRE_SP_ANY;
898         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
899         export->exp_client_uuid = *cluuid;
900         obd_init_export(export);
901
902         spin_lock(&obd->obd_dev_lock);
903         /* shouldn't happen, but might race */
904         if (obd->obd_stopping)
905                 GOTO(exit_unlock, rc = -ENODEV);
906
907         hash = cfs_hash_getref(obd->obd_uuid_hash);
908         if (hash == NULL)
909                 GOTO(exit_unlock, rc = -ENODEV);
910         spin_unlock(&obd->obd_dev_lock);
911
912         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
913                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
914                 if (rc != 0) {
915                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
916                                       obd->obd_name, cluuid->uuid, rc);
917                         GOTO(exit_err, rc = -EALREADY);
918                 }
919         }
920
921         spin_lock(&obd->obd_dev_lock);
922         if (obd->obd_stopping) {
923                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
924                 GOTO(exit_unlock, rc = -ENODEV);
925         }
926
927         class_incref(obd, "export", export);
928         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
929         cfs_list_add_tail(&export->exp_obd_chain_timed,
930                           &export->exp_obd->obd_exports_timed);
931         export->exp_obd->obd_num_exports++;
932         spin_unlock(&obd->obd_dev_lock);
933         cfs_hash_putref(hash);
934         RETURN(export);
935
936 exit_unlock:
937         spin_unlock(&obd->obd_dev_lock);
938 exit_err:
939         if (hash)
940                 cfs_hash_putref(hash);
941         class_handle_unhash(&export->exp_handle);
942         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
943         obd_destroy_export(export);
944         OBD_FREE_PTR(export);
945         return ERR_PTR(rc);
946 }
947 EXPORT_SYMBOL(class_new_export);
948
949 void class_unlink_export(struct obd_export *exp)
950 {
951         class_handle_unhash(&exp->exp_handle);
952
953         spin_lock(&exp->exp_obd->obd_dev_lock);
954         /* delete an uuid-export hashitem from hashtables */
955         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
956                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
957                              &exp->exp_client_uuid,
958                              &exp->exp_uuid_hash);
959
960         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
961         cfs_list_del_init(&exp->exp_obd_chain_timed);
962         exp->exp_obd->obd_num_exports--;
963         spin_unlock(&exp->exp_obd->obd_dev_lock);
964         class_export_put(exp);
965 }
966 EXPORT_SYMBOL(class_unlink_export);
967
968 /* Import management functions */
969 void class_import_destroy(struct obd_import *imp)
970 {
971         ENTRY;
972
973         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
974                 imp->imp_obd->obd_name);
975
976         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
977
978         ptlrpc_put_connection_superhack(imp->imp_connection);
979
980         while (!cfs_list_empty(&imp->imp_conn_list)) {
981                 struct obd_import_conn *imp_conn;
982
983                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
984                                           struct obd_import_conn, oic_item);
985                 cfs_list_del_init(&imp_conn->oic_item);
986                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
987                 OBD_FREE(imp_conn, sizeof(*imp_conn));
988         }
989
990         LASSERT(imp->imp_sec == NULL);
991         class_decref(imp->imp_obd, "import", imp);
992         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
993         EXIT;
994 }
995
996 static void import_handle_addref(void *import)
997 {
998         class_import_get(import);
999 }
1000
1001 static struct portals_handle_ops import_handle_ops = {
1002         .hop_addref = import_handle_addref,
1003         .hop_free   = NULL,
1004 };
1005
1006 struct obd_import *class_import_get(struct obd_import *import)
1007 {
1008         atomic_inc(&import->imp_refcount);
1009         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1010                atomic_read(&import->imp_refcount),
1011                import->imp_obd->obd_name);
1012         return import;
1013 }
1014 EXPORT_SYMBOL(class_import_get);
1015
1016 void class_import_put(struct obd_import *imp)
1017 {
1018         ENTRY;
1019
1020         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1021         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1022
1023         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1024                atomic_read(&imp->imp_refcount) - 1,
1025                imp->imp_obd->obd_name);
1026
1027         if (atomic_dec_and_test(&imp->imp_refcount)) {
1028                 CDEBUG(D_INFO, "final put import %p\n", imp);
1029                 obd_zombie_import_add(imp);
1030         }
1031
1032         /* catch possible import put race */
1033         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1034         EXIT;
1035 }
1036 EXPORT_SYMBOL(class_import_put);
1037
1038 static void init_imp_at(struct imp_at *at) {
1039         int i;
1040         at_init(&at->iat_net_latency, 0, 0);
1041         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1042                 /* max service estimates are tracked on the server side, so
1043                    don't use the AT history here, just use the last reported
1044                    val. (But keep hist for proc histogram, worst_ever) */
1045                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1046                         AT_FLG_NOHIST);
1047         }
1048 }
1049
1050 struct obd_import *class_new_import(struct obd_device *obd)
1051 {
1052         struct obd_import *imp;
1053
1054         OBD_ALLOC(imp, sizeof(*imp));
1055         if (imp == NULL)
1056                 return NULL;
1057
1058         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1059         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1060         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1061         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1062         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1063         CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1064         imp->imp_replay_cursor = &imp->imp_committed_list;
1065         spin_lock_init(&imp->imp_lock);
1066         imp->imp_last_success_conn = 0;
1067         imp->imp_state = LUSTRE_IMP_NEW;
1068         imp->imp_obd = class_incref(obd, "import", imp);
1069         mutex_init(&imp->imp_sec_mutex);
1070         init_waitqueue_head(&imp->imp_recovery_waitq);
1071
1072         atomic_set(&imp->imp_refcount, 2);
1073         atomic_set(&imp->imp_unregistering, 0);
1074         atomic_set(&imp->imp_inflight, 0);
1075         atomic_set(&imp->imp_replay_inflight, 0);
1076         atomic_set(&imp->imp_inval_count, 0);
1077         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1078         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1079         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1080         init_imp_at(&imp->imp_at);
1081
1082         /* the default magic is V2, will be used in connect RPC, and
1083          * then adjusted according to the flags in request/reply. */
1084         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1085
1086         return imp;
1087 }
1088 EXPORT_SYMBOL(class_new_import);
1089
1090 void class_destroy_import(struct obd_import *import)
1091 {
1092         LASSERT(import != NULL);
1093         LASSERT(import != LP_POISON);
1094
1095         class_handle_unhash(&import->imp_handle);
1096
1097         spin_lock(&import->imp_lock);
1098         import->imp_generation++;
1099         spin_unlock(&import->imp_lock);
1100         class_import_put(import);
1101 }
1102 EXPORT_SYMBOL(class_destroy_import);
1103
1104 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1105
1106 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1107 {
1108         spin_lock(&exp->exp_locks_list_guard);
1109
1110         LASSERT(lock->l_exp_refs_nr >= 0);
1111
1112         if (lock->l_exp_refs_target != NULL &&
1113             lock->l_exp_refs_target != exp) {
1114                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1115                               exp, lock, lock->l_exp_refs_target);
1116         }
1117         if ((lock->l_exp_refs_nr ++) == 0) {
1118                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1119                 lock->l_exp_refs_target = exp;
1120         }
1121         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1122                lock, exp, lock->l_exp_refs_nr);
1123         spin_unlock(&exp->exp_locks_list_guard);
1124 }
1125 EXPORT_SYMBOL(__class_export_add_lock_ref);
1126
1127 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1128 {
1129         spin_lock(&exp->exp_locks_list_guard);
1130         LASSERT(lock->l_exp_refs_nr > 0);
1131         if (lock->l_exp_refs_target != exp) {
1132                 LCONSOLE_WARN("lock %p, "
1133                               "mismatching export pointers: %p, %p\n",
1134                               lock, lock->l_exp_refs_target, exp);
1135         }
1136         if (-- lock->l_exp_refs_nr == 0) {
1137                 cfs_list_del_init(&lock->l_exp_refs_link);
1138                 lock->l_exp_refs_target = NULL;
1139         }
1140         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1141                lock, exp, lock->l_exp_refs_nr);
1142         spin_unlock(&exp->exp_locks_list_guard);
1143 }
1144 EXPORT_SYMBOL(__class_export_del_lock_ref);
1145 #endif
1146
1147 /* A connection defines an export context in which preallocation can
1148    be managed. This releases the export pointer reference, and returns
1149    the export handle, so the export refcount is 1 when this function
1150    returns. */
1151 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1152                   struct obd_uuid *cluuid)
1153 {
1154         struct obd_export *export;
1155         LASSERT(conn != NULL);
1156         LASSERT(obd != NULL);
1157         LASSERT(cluuid != NULL);
1158         ENTRY;
1159
1160         export = class_new_export(obd, cluuid);
1161         if (IS_ERR(export))
1162                 RETURN(PTR_ERR(export));
1163
1164         conn->cookie = export->exp_handle.h_cookie;
1165         class_export_put(export);
1166
1167         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1168                cluuid->uuid, conn->cookie);
1169         RETURN(0);
1170 }
1171 EXPORT_SYMBOL(class_connect);
1172
1173 /* if export is involved in recovery then clean up related things */
1174 void class_export_recovery_cleanup(struct obd_export *exp)
1175 {
1176         struct obd_device *obd = exp->exp_obd;
1177
1178         spin_lock(&obd->obd_recovery_task_lock);
1179         if (exp->exp_delayed)
1180                 obd->obd_delayed_clients--;
1181         if (obd->obd_recovering) {
1182                 if (exp->exp_in_recovery) {
1183                         spin_lock(&exp->exp_lock);
1184                         exp->exp_in_recovery = 0;
1185                         spin_unlock(&exp->exp_lock);
1186                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1187                         atomic_dec(&obd->obd_connected_clients);
1188                 }
1189
1190                 /* if called during recovery then should update
1191                  * obd_stale_clients counter,
1192                  * lightweight exports are not counted */
1193                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1194                         exp->exp_obd->obd_stale_clients++;
1195         }
1196         spin_unlock(&obd->obd_recovery_task_lock);
1197         /** Cleanup req replay fields */
1198         if (exp->exp_req_replay_needed) {
1199                 spin_lock(&exp->exp_lock);
1200                 exp->exp_req_replay_needed = 0;
1201                 spin_unlock(&exp->exp_lock);
1202                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1203                 atomic_dec(&obd->obd_req_replay_clients);
1204         }
1205         /** Cleanup lock replay data */
1206         if (exp->exp_lock_replay_needed) {
1207                 spin_lock(&exp->exp_lock);
1208                 exp->exp_lock_replay_needed = 0;
1209                 spin_unlock(&exp->exp_lock);
1210                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1211                 atomic_dec(&obd->obd_lock_replay_clients);
1212         }
1213 }
1214
1215 /* This function removes 1-3 references from the export:
1216  * 1 - for export pointer passed
1217  * and if disconnect really need
1218  * 2 - removing from hash
1219  * 3 - in client_unlink_export
1220  * The export pointer passed to this function can destroyed */
1221 int class_disconnect(struct obd_export *export)
1222 {
1223         int already_disconnected;
1224         ENTRY;
1225
1226         if (export == NULL) {
1227                 CWARN("attempting to free NULL export %p\n", export);
1228                 RETURN(-EINVAL);
1229         }
1230
1231         spin_lock(&export->exp_lock);
1232         already_disconnected = export->exp_disconnected;
1233         export->exp_disconnected = 1;
1234         spin_unlock(&export->exp_lock);
1235
1236         /* class_cleanup(), abort_recovery(), and class_fail_export()
1237          * all end up in here, and if any of them race we shouldn't
1238          * call extra class_export_puts(). */
1239         if (already_disconnected) {
1240                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1241                 GOTO(no_disconn, already_disconnected);
1242         }
1243
1244         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1245                export->exp_handle.h_cookie);
1246
1247         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1248                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1249                              &export->exp_connection->c_peer.nid,
1250                              &export->exp_nid_hash);
1251
1252         class_export_recovery_cleanup(export);
1253         class_unlink_export(export);
1254 no_disconn:
1255         class_export_put(export);
1256         RETURN(0);
1257 }
1258 EXPORT_SYMBOL(class_disconnect);
1259
1260 /* Return non-zero for a fully connected export */
1261 int class_connected_export(struct obd_export *exp)
1262 {
1263         int connected = 0;
1264
1265         if (exp) {
1266                 spin_lock(&exp->exp_lock);
1267                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1268                 spin_unlock(&exp->exp_lock);
1269         }
1270         return connected;
1271 }
1272 EXPORT_SYMBOL(class_connected_export);
1273
1274 static void class_disconnect_export_list(cfs_list_t *list,
1275                                          enum obd_option flags)
1276 {
1277         int rc;
1278         struct obd_export *exp;
1279         ENTRY;
1280
1281         /* It's possible that an export may disconnect itself, but
1282          * nothing else will be added to this list. */
1283         while (!cfs_list_empty(list)) {
1284                 exp = cfs_list_entry(list->next, struct obd_export,
1285                                      exp_obd_chain);
1286                 /* need for safe call CDEBUG after obd_disconnect */
1287                 class_export_get(exp);
1288
1289                 spin_lock(&exp->exp_lock);
1290                 exp->exp_flags = flags;
1291                 spin_unlock(&exp->exp_lock);
1292
1293                 if (obd_uuid_equals(&exp->exp_client_uuid,
1294                                     &exp->exp_obd->obd_uuid)) {
1295                         CDEBUG(D_HA,
1296                                "exp %p export uuid == obd uuid, don't discon\n",
1297                                exp);
1298                         /* Need to delete this now so we don't end up pointing
1299                          * to work_list later when this export is cleaned up. */
1300                         cfs_list_del_init(&exp->exp_obd_chain);
1301                         class_export_put(exp);
1302                         continue;
1303                 }
1304
1305                 class_export_get(exp);
1306                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1307                        "last request at "CFS_TIME_T"\n",
1308                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1309                        exp, exp->exp_last_request_time);
1310                 /* release one export reference anyway */
1311                 rc = obd_disconnect(exp);
1312
1313                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1314                        obd_export_nid2str(exp), exp, rc);
1315                 class_export_put(exp);
1316         }
1317         EXIT;
1318 }
1319
1320 void class_disconnect_exports(struct obd_device *obd)
1321 {
1322         cfs_list_t work_list;
1323         ENTRY;
1324
1325         /* Move all of the exports from obd_exports to a work list, en masse. */
1326         CFS_INIT_LIST_HEAD(&work_list);
1327         spin_lock(&obd->obd_dev_lock);
1328         cfs_list_splice_init(&obd->obd_exports, &work_list);
1329         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1330         spin_unlock(&obd->obd_dev_lock);
1331
1332         if (!cfs_list_empty(&work_list)) {
1333                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1334                        "disconnecting them\n", obd->obd_minor, obd);
1335                 class_disconnect_export_list(&work_list,
1336                                              exp_flags_from_obd(obd));
1337         } else
1338                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1339                        obd->obd_minor, obd);
1340         EXIT;
1341 }
1342 EXPORT_SYMBOL(class_disconnect_exports);
1343
1344 /* Remove exports that have not completed recovery.
1345  */
1346 void class_disconnect_stale_exports(struct obd_device *obd,
1347                                     int (*test_export)(struct obd_export *))
1348 {
1349         cfs_list_t work_list;
1350         struct obd_export *exp, *n;
1351         int evicted = 0;
1352         ENTRY;
1353
1354         CFS_INIT_LIST_HEAD(&work_list);
1355         spin_lock(&obd->obd_dev_lock);
1356         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1357                                      exp_obd_chain) {
1358                 /* don't count self-export as client */
1359                 if (obd_uuid_equals(&exp->exp_client_uuid,
1360                                     &exp->exp_obd->obd_uuid))
1361                         continue;
1362
1363                 /* don't evict clients which have no slot in last_rcvd
1364                  * (e.g. lightweight connection) */
1365                 if (exp->exp_target_data.ted_lr_idx == -1)
1366                         continue;
1367
1368                 spin_lock(&exp->exp_lock);
1369                 if (exp->exp_failed || test_export(exp)) {
1370                         spin_unlock(&exp->exp_lock);
1371                         continue;
1372                 }
1373                 exp->exp_failed = 1;
1374                 spin_unlock(&exp->exp_lock);
1375
1376                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1377                 evicted++;
1378                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1379                        obd->obd_name, exp->exp_client_uuid.uuid,
1380                        exp->exp_connection == NULL ? "<unknown>" :
1381                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1382                 print_export_data(exp, "EVICTING", 0);
1383         }
1384         spin_unlock(&obd->obd_dev_lock);
1385
1386         if (evicted)
1387                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1388                               obd->obd_name, evicted);
1389
1390         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1391                                                  OBD_OPT_ABORT_RECOV);
1392         EXIT;
1393 }
1394 EXPORT_SYMBOL(class_disconnect_stale_exports);
1395
1396 void class_fail_export(struct obd_export *exp)
1397 {
1398         int rc, already_failed;
1399
1400         spin_lock(&exp->exp_lock);
1401         already_failed = exp->exp_failed;
1402         exp->exp_failed = 1;
1403         spin_unlock(&exp->exp_lock);
1404
1405         if (already_failed) {
1406                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1407                        exp, exp->exp_client_uuid.uuid);
1408                 return;
1409         }
1410
1411         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1412                exp, exp->exp_client_uuid.uuid);
1413
1414         if (obd_dump_on_timeout)
1415                 libcfs_debug_dumplog();
1416
1417         /* need for safe call CDEBUG after obd_disconnect */
1418         class_export_get(exp);
1419
1420         /* Most callers into obd_disconnect are removing their own reference
1421          * (request, for example) in addition to the one from the hash table.
1422          * We don't have such a reference here, so make one. */
1423         class_export_get(exp);
1424         rc = obd_disconnect(exp);
1425         if (rc)
1426                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1427         else
1428                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1429                        exp, exp->exp_client_uuid.uuid);
1430         class_export_put(exp);
1431 }
1432 EXPORT_SYMBOL(class_fail_export);
1433
1434 char *obd_export_nid2str(struct obd_export *exp)
1435 {
1436         if (exp->exp_connection != NULL)
1437                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1438
1439         return "(no nid)";
1440 }
1441 EXPORT_SYMBOL(obd_export_nid2str);
1442
1443 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1444 {
1445         cfs_hash_t *nid_hash;
1446         struct obd_export *doomed_exp = NULL;
1447         int exports_evicted = 0;
1448
1449         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1450
1451         spin_lock(&obd->obd_dev_lock);
1452         /* umount has run already, so evict thread should leave
1453          * its task to umount thread now */
1454         if (obd->obd_stopping) {
1455                 spin_unlock(&obd->obd_dev_lock);
1456                 return exports_evicted;
1457         }
1458         nid_hash = obd->obd_nid_hash;
1459         cfs_hash_getref(nid_hash);
1460         spin_unlock(&obd->obd_dev_lock);
1461
1462         do {
1463                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1464                 if (doomed_exp == NULL)
1465                         break;
1466
1467                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1468                          "nid %s found, wanted nid %s, requested nid %s\n",
1469                          obd_export_nid2str(doomed_exp),
1470                          libcfs_nid2str(nid_key), nid);
1471                 LASSERTF(doomed_exp != obd->obd_self_export,
1472                          "self-export is hashed by NID?\n");
1473                 exports_evicted++;
1474                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1475                               "request\n", obd->obd_name,
1476                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1477                               obd_export_nid2str(doomed_exp));
1478                 class_fail_export(doomed_exp);
1479                 class_export_put(doomed_exp);
1480         } while (1);
1481
1482         cfs_hash_putref(nid_hash);
1483
1484         if (!exports_evicted)
1485                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1486                        obd->obd_name, nid);
1487         return exports_evicted;
1488 }
1489 EXPORT_SYMBOL(obd_export_evict_by_nid);
1490
1491 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1492 {
1493         cfs_hash_t *uuid_hash;
1494         struct obd_export *doomed_exp = NULL;
1495         struct obd_uuid doomed_uuid;
1496         int exports_evicted = 0;
1497
1498         spin_lock(&obd->obd_dev_lock);
1499         if (obd->obd_stopping) {
1500                 spin_unlock(&obd->obd_dev_lock);
1501                 return exports_evicted;
1502         }
1503         uuid_hash = obd->obd_uuid_hash;
1504         cfs_hash_getref(uuid_hash);
1505         spin_unlock(&obd->obd_dev_lock);
1506
1507         obd_str2uuid(&doomed_uuid, uuid);
1508         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1509                 CERROR("%s: can't evict myself\n", obd->obd_name);
1510                 cfs_hash_putref(uuid_hash);
1511                 return exports_evicted;
1512         }
1513
1514         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1515
1516         if (doomed_exp == NULL) {
1517                 CERROR("%s: can't disconnect %s: no exports found\n",
1518                        obd->obd_name, uuid);
1519         } else {
1520                 CWARN("%s: evicting %s at adminstrative request\n",
1521                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1522                 class_fail_export(doomed_exp);
1523                 class_export_put(doomed_exp);
1524                 exports_evicted++;
1525         }
1526         cfs_hash_putref(uuid_hash);
1527
1528         return exports_evicted;
1529 }
1530 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1531
1532 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1533 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1534 EXPORT_SYMBOL(class_export_dump_hook);
1535 #endif
1536
1537 static void print_export_data(struct obd_export *exp, const char *status,
1538                               int locks)
1539 {
1540         struct ptlrpc_reply_state *rs;
1541         struct ptlrpc_reply_state *first_reply = NULL;
1542         int nreplies = 0;
1543
1544         spin_lock(&exp->exp_lock);
1545         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1546                                 rs_exp_list) {
1547                 if (nreplies == 0)
1548                         first_reply = rs;
1549                 nreplies++;
1550         }
1551         spin_unlock(&exp->exp_lock);
1552
1553         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1554                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1555                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1556                atomic_read(&exp->exp_rpc_count),
1557                atomic_read(&exp->exp_cb_count),
1558                atomic_read(&exp->exp_locks_count),
1559                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1560                nreplies, first_reply, nreplies > 3 ? "..." : "",
1561                exp->exp_last_committed);
1562 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1563         if (locks && class_export_dump_hook != NULL)
1564                 class_export_dump_hook(exp);
1565 #endif
1566 }
1567
1568 void dump_exports(struct obd_device *obd, int locks)
1569 {
1570         struct obd_export *exp;
1571
1572         spin_lock(&obd->obd_dev_lock);
1573         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1574                 print_export_data(exp, "ACTIVE", locks);
1575         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1576                 print_export_data(exp, "UNLINKED", locks);
1577         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1578                 print_export_data(exp, "DELAYED", locks);
1579         spin_unlock(&obd->obd_dev_lock);
1580         spin_lock(&obd_zombie_impexp_lock);
1581         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1582                 print_export_data(exp, "ZOMBIE", locks);
1583         spin_unlock(&obd_zombie_impexp_lock);
1584 }
1585 EXPORT_SYMBOL(dump_exports);
1586
1587 void obd_exports_barrier(struct obd_device *obd)
1588 {
1589         int waited = 2;
1590         LASSERT(cfs_list_empty(&obd->obd_exports));
1591         spin_lock(&obd->obd_dev_lock);
1592         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1593                 spin_unlock(&obd->obd_dev_lock);
1594                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1595                                                    cfs_time_seconds(waited));
1596                 if (waited > 5 && IS_PO2(waited)) {
1597                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1598                                       "more than %d seconds. "
1599                                       "The obd refcount = %d. Is it stuck?\n",
1600                                       obd->obd_name, waited,
1601                                       atomic_read(&obd->obd_refcount));
1602                         dump_exports(obd, 1);
1603                 }
1604                 waited *= 2;
1605                 spin_lock(&obd->obd_dev_lock);
1606         }
1607         spin_unlock(&obd->obd_dev_lock);
1608 }
1609 EXPORT_SYMBOL(obd_exports_barrier);
1610
1611 /* Total amount of zombies to be destroyed */
1612 static int zombies_count = 0;
1613
1614 /**
1615  * kill zombie imports and exports
1616  */
1617 void obd_zombie_impexp_cull(void)
1618 {
1619         struct obd_import *import;
1620         struct obd_export *export;
1621         ENTRY;
1622
1623         do {
1624                 spin_lock(&obd_zombie_impexp_lock);
1625
1626                 import = NULL;
1627                 if (!cfs_list_empty(&obd_zombie_imports)) {
1628                         import = cfs_list_entry(obd_zombie_imports.next,
1629                                                 struct obd_import,
1630                                                 imp_zombie_chain);
1631                         cfs_list_del_init(&import->imp_zombie_chain);
1632                 }
1633
1634                 export = NULL;
1635                 if (!cfs_list_empty(&obd_zombie_exports)) {
1636                         export = cfs_list_entry(obd_zombie_exports.next,
1637                                                 struct obd_export,
1638                                                 exp_obd_chain);
1639                         cfs_list_del_init(&export->exp_obd_chain);
1640                 }
1641
1642                 spin_unlock(&obd_zombie_impexp_lock);
1643
1644                 if (import != NULL) {
1645                         class_import_destroy(import);
1646                         spin_lock(&obd_zombie_impexp_lock);
1647                         zombies_count--;
1648                         spin_unlock(&obd_zombie_impexp_lock);
1649                 }
1650
1651                 if (export != NULL) {
1652                         class_export_destroy(export);
1653                         spin_lock(&obd_zombie_impexp_lock);
1654                         zombies_count--;
1655                         spin_unlock(&obd_zombie_impexp_lock);
1656                 }
1657
1658                 cond_resched();
1659         } while (import != NULL || export != NULL);
1660         EXIT;
1661 }
1662
1663 static struct completion        obd_zombie_start;
1664 static struct completion        obd_zombie_stop;
1665 static unsigned long            obd_zombie_flags;
1666 static wait_queue_head_t        obd_zombie_waitq;
1667 static pid_t                    obd_zombie_pid;
1668
1669 enum {
1670         OBD_ZOMBIE_STOP         = 0x0001,
1671 };
1672
1673 /**
1674  * check for work for kill zombie import/export thread.
1675  */
1676 static int obd_zombie_impexp_check(void *arg)
1677 {
1678         int rc;
1679
1680         spin_lock(&obd_zombie_impexp_lock);
1681         rc = (zombies_count == 0) &&
1682              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1683         spin_unlock(&obd_zombie_impexp_lock);
1684
1685         RETURN(rc);
1686 }
1687
1688 /**
1689  * Add export to the obd_zombe thread and notify it.
1690  */
1691 static void obd_zombie_export_add(struct obd_export *exp) {
1692         spin_lock(&exp->exp_obd->obd_dev_lock);
1693         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1694         cfs_list_del_init(&exp->exp_obd_chain);
1695         spin_unlock(&exp->exp_obd->obd_dev_lock);
1696         spin_lock(&obd_zombie_impexp_lock);
1697         zombies_count++;
1698         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1699         spin_unlock(&obd_zombie_impexp_lock);
1700
1701         obd_zombie_impexp_notify();
1702 }
1703
1704 /**
1705  * Add import to the obd_zombe thread and notify it.
1706  */
1707 static void obd_zombie_import_add(struct obd_import *imp) {
1708         LASSERT(imp->imp_sec == NULL);
1709         LASSERT(imp->imp_rq_pool == NULL);
1710         spin_lock(&obd_zombie_impexp_lock);
1711         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1712         zombies_count++;
1713         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1714         spin_unlock(&obd_zombie_impexp_lock);
1715
1716         obd_zombie_impexp_notify();
1717 }
1718
1719 /**
1720  * notify import/export destroy thread about new zombie.
1721  */
1722 static void obd_zombie_impexp_notify(void)
1723 {
1724         /*
1725          * Make sure obd_zomebie_impexp_thread get this notification.
1726          * It is possible this signal only get by obd_zombie_barrier, and
1727          * barrier gulps this notification and sleeps away and hangs ensues
1728          */
1729         wake_up_all(&obd_zombie_waitq);
1730 }
1731
1732 /**
1733  * check whether obd_zombie is idle
1734  */
1735 static int obd_zombie_is_idle(void)
1736 {
1737         int rc;
1738
1739         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1740         spin_lock(&obd_zombie_impexp_lock);
1741         rc = (zombies_count == 0);
1742         spin_unlock(&obd_zombie_impexp_lock);
1743         return rc;
1744 }
1745
1746 /**
1747  * wait when obd_zombie import/export queues become empty
1748  */
1749 void obd_zombie_barrier(void)
1750 {
1751         struct l_wait_info lwi = { 0 };
1752
1753         if (obd_zombie_pid == current_pid())
1754                 /* don't wait for myself */
1755                 return;
1756         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1757 }
1758 EXPORT_SYMBOL(obd_zombie_barrier);
1759
1760 #ifdef __KERNEL__
1761
1762 /**
1763  * destroy zombie export/import thread.
1764  */
1765 static int obd_zombie_impexp_thread(void *unused)
1766 {
1767         unshare_fs_struct();
1768         complete(&obd_zombie_start);
1769
1770         obd_zombie_pid = current_pid();
1771
1772         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1773                 struct l_wait_info lwi = { 0 };
1774
1775                 l_wait_event(obd_zombie_waitq,
1776                              !obd_zombie_impexp_check(NULL), &lwi);
1777                 obd_zombie_impexp_cull();
1778
1779                 /*
1780                  * Notify obd_zombie_barrier callers that queues
1781                  * may be empty.
1782                  */
1783                 wake_up(&obd_zombie_waitq);
1784         }
1785
1786         complete(&obd_zombie_stop);
1787
1788         RETURN(0);
1789 }
1790
1791 #else /* ! KERNEL */
1792
1793 static atomic_t zombie_recur = ATOMIC_INIT(0);
1794 static void *obd_zombie_impexp_work_cb;
1795 static void *obd_zombie_impexp_idle_cb;
1796
1797 int obd_zombie_impexp_kill(void *arg)
1798 {
1799         int rc = 0;
1800
1801         if (atomic_inc_return(&zombie_recur) == 1) {
1802                 obd_zombie_impexp_cull();
1803                 rc = 1;
1804         }
1805         atomic_dec(&zombie_recur);
1806         return rc;
1807 }
1808
1809 #endif
1810
1811 /**
1812  * start destroy zombie import/export thread
1813  */
1814 int obd_zombie_impexp_init(void)
1815 {
1816 #ifdef __KERNEL__
1817         struct task_struct *task;
1818 #endif
1819
1820         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1821         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1822         spin_lock_init(&obd_zombie_impexp_lock);
1823         init_completion(&obd_zombie_start);
1824         init_completion(&obd_zombie_stop);
1825         init_waitqueue_head(&obd_zombie_waitq);
1826         obd_zombie_pid = 0;
1827
1828 #ifdef __KERNEL__
1829         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1830         if (IS_ERR(task))
1831                 RETURN(PTR_ERR(task));
1832
1833         wait_for_completion(&obd_zombie_start);
1834 #else
1835
1836         obd_zombie_impexp_work_cb =
1837                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1838                                                  &obd_zombie_impexp_kill, NULL);
1839
1840         obd_zombie_impexp_idle_cb =
1841                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1842                                                  &obd_zombie_impexp_check, NULL);
1843 #endif
1844         RETURN(0);
1845 }
1846 /**
1847  * stop destroy zombie import/export thread
1848  */
1849 void obd_zombie_impexp_stop(void)
1850 {
1851         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1852         obd_zombie_impexp_notify();
1853 #ifdef __KERNEL__
1854         wait_for_completion(&obd_zombie_stop);
1855 #else
1856         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1857         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1858 #endif
1859 }
1860
1861 /***** Kernel-userspace comm helpers *******/
1862
1863 /* Get length of entire message, including header */
1864 int kuc_len(int payload_len)
1865 {
1866         return sizeof(struct kuc_hdr) + payload_len;
1867 }
1868 EXPORT_SYMBOL(kuc_len);
1869
1870 /* Get a pointer to kuc header, given a ptr to the payload
1871  * @param p Pointer to payload area
1872  * @returns Pointer to kuc header
1873  */
1874 struct kuc_hdr * kuc_ptr(void *p)
1875 {
1876         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1877         LASSERT(lh->kuc_magic == KUC_MAGIC);
1878         return lh;
1879 }
1880 EXPORT_SYMBOL(kuc_ptr);
1881
1882 /* Test if payload is part of kuc message
1883  * @param p Pointer to payload area
1884  * @returns boolean
1885  */
1886 int kuc_ispayload(void *p)
1887 {
1888         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1889
1890         if (kh->kuc_magic == KUC_MAGIC)
1891                 return 1;
1892         else
1893                 return 0;
1894 }
1895 EXPORT_SYMBOL(kuc_ispayload);
1896
1897 /* Alloc space for a message, and fill in header
1898  * @return Pointer to payload area
1899  */
1900 void *kuc_alloc(int payload_len, int transport, int type)
1901 {
1902         struct kuc_hdr *lh;
1903         int len = kuc_len(payload_len);
1904
1905         OBD_ALLOC(lh, len);
1906         if (lh == NULL)
1907                 return ERR_PTR(-ENOMEM);
1908
1909         lh->kuc_magic = KUC_MAGIC;
1910         lh->kuc_transport = transport;
1911         lh->kuc_msgtype = type;
1912         lh->kuc_msglen = len;
1913
1914         return (void *)(lh + 1);
1915 }
1916 EXPORT_SYMBOL(kuc_alloc);
1917
1918 /* Takes pointer to payload area */
1919 inline void kuc_free(void *p, int payload_len)
1920 {
1921         struct kuc_hdr *lh = kuc_ptr(p);
1922         OBD_FREE(lh, kuc_len(payload_len));
1923 }
1924 EXPORT_SYMBOL(kuc_free);
1925
1926
1927