Whamcloud - gitweb
LU-4357 libcfs: restore __GFP_WAIT flag to memalloc calls
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151 EXPORT_SYMBOL(class_get_type);
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161 EXPORT_SYMBOL(class_put_type);
162
163 #define CLASS_MAX_NAME 1024
164
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166                         struct lprocfs_seq_vars *module_vars,
167 #ifndef HAVE_ONLY_PROCFS_SEQ
168                         struct lprocfs_vars *vars,
169 #endif
170                         const char *name, struct lu_device_type *ldt)
171 {
172         struct obd_type *type;
173         int rc = 0;
174         ENTRY;
175
176         /* sanity check */
177         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178
179         if (class_search_type(name)) {
180                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
181                 RETURN(-EEXIST);
182         }
183
184         rc = -ENOMEM;
185         OBD_ALLOC(type, sizeof(*type));
186         if (type == NULL)
187                 RETURN(rc);
188
189         OBD_ALLOC_PTR(type->typ_dt_ops);
190         OBD_ALLOC_PTR(type->typ_md_ops);
191         OBD_ALLOC(type->typ_name, strlen(name) + 1);
192
193         if (type->typ_dt_ops == NULL ||
194             type->typ_md_ops == NULL ||
195             type->typ_name == NULL)
196                 GOTO (failed, rc);
197
198         *(type->typ_dt_ops) = *dt_ops;
199         /* md_ops is optional */
200         if (md_ops)
201                 *(type->typ_md_ops) = *md_ops;
202         strcpy(type->typ_name, name);
203         spin_lock_init(&type->obd_type_lock);
204
205 #ifdef LPROCFS
206 #ifndef HAVE_ONLY_PROCFS_SEQ
207         if (vars) {
208                 type->typ_procroot = lprocfs_register(type->typ_name,
209                                                         proc_lustre_root,
210                                                         vars, type);
211         } else
212 #endif
213         {
214                 type->typ_procroot = lprocfs_seq_register(type->typ_name,
215                                                         proc_lustre_root,
216                                                         module_vars, type);
217         }
218         if (IS_ERR(type->typ_procroot)) {
219                 rc = PTR_ERR(type->typ_procroot);
220                 type->typ_procroot = NULL;
221                 GOTO (failed, rc);
222         }
223 #endif
224         if (ldt != NULL) {
225                 type->typ_lu = ldt;
226                 rc = lu_device_type_init(ldt);
227                 if (rc != 0)
228                         GOTO (failed, rc);
229         }
230
231         spin_lock(&obd_types_lock);
232         cfs_list_add(&type->typ_chain, &obd_types);
233         spin_unlock(&obd_types_lock);
234
235         RETURN (0);
236
237  failed:
238         if (type->typ_name != NULL)
239                 OBD_FREE(type->typ_name, strlen(name) + 1);
240         if (type->typ_md_ops != NULL)
241                 OBD_FREE_PTR(type->typ_md_ops);
242         if (type->typ_dt_ops != NULL)
243                 OBD_FREE_PTR(type->typ_dt_ops);
244 #ifdef LPROCFS
245 #ifndef HAVE_ONLY_PROCFS_SEQ
246         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
247 #else
248         remove_proc_subtree(type->typ_name, proc_lustre_root);
249 #endif
250 #endif
251         OBD_FREE(type, sizeof(*type));
252         RETURN(rc);
253 }
254 EXPORT_SYMBOL(class_register_type);
255
256 int class_unregister_type(const char *name)
257 {
258         struct obd_type *type = class_search_type(name);
259         ENTRY;
260
261         if (!type) {
262                 CERROR("unknown obd type\n");
263                 RETURN(-EINVAL);
264         }
265
266         if (type->typ_refcnt) {
267                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
268                 /* This is a bad situation, let's make the best of it */
269                 /* Remove ops, but leave the name for debugging */
270                 OBD_FREE_PTR(type->typ_dt_ops);
271                 OBD_FREE_PTR(type->typ_md_ops);
272                 RETURN(-EBUSY);
273         }
274
275         /* we do not use type->typ_procroot as for compatibility purposes
276          * other modules can share names (i.e. lod can use lov entry). so
277          * we can't reference pointer as it can get invalided when another
278          * module removes the entry */
279 #ifdef LPROCFS
280 #ifndef HAVE_ONLY_PROCFS_SEQ
281         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
282 #else
283         remove_proc_subtree(type->typ_name, proc_lustre_root);
284 #endif
285 #endif
286         if (type->typ_lu)
287                 lu_device_type_fini(type->typ_lu);
288
289         spin_lock(&obd_types_lock);
290         cfs_list_del(&type->typ_chain);
291         spin_unlock(&obd_types_lock);
292         OBD_FREE(type->typ_name, strlen(name) + 1);
293         if (type->typ_dt_ops != NULL)
294                 OBD_FREE_PTR(type->typ_dt_ops);
295         if (type->typ_md_ops != NULL)
296                 OBD_FREE_PTR(type->typ_md_ops);
297         OBD_FREE(type, sizeof(*type));
298         RETURN(0);
299 } /* class_unregister_type */
300 EXPORT_SYMBOL(class_unregister_type);
301
302 /**
303  * Create a new obd device.
304  *
305  * Find an empty slot in ::obd_devs[], create a new obd device in it.
306  *
307  * \param[in] type_name obd device type string.
308  * \param[in] name      obd device name.
309  *
310  * \retval NULL if create fails, otherwise return the obd device
311  *         pointer created.
312  */
313 struct obd_device *class_newdev(const char *type_name, const char *name)
314 {
315         struct obd_device *result = NULL;
316         struct obd_device *newdev;
317         struct obd_type *type = NULL;
318         int i;
319         int new_obd_minor = 0;
320         ENTRY;
321
322         if (strlen(name) >= MAX_OBD_NAME) {
323                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
324                 RETURN(ERR_PTR(-EINVAL));
325         }
326
327         type = class_get_type(type_name);
328         if (type == NULL){
329                 CERROR("OBD: unknown type: %s\n", type_name);
330                 RETURN(ERR_PTR(-ENODEV));
331         }
332
333         newdev = obd_device_alloc();
334         if (newdev == NULL)
335                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
336
337         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
338
339         write_lock(&obd_dev_lock);
340         for (i = 0; i < class_devno_max(); i++) {
341                 struct obd_device *obd = class_num2obd(i);
342
343                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
344                         CERROR("Device %s already exists at %d, won't add\n",
345                                name, i);
346                         if (result) {
347                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
348                                          "%p obd_magic %08x != %08x\n", result,
349                                          result->obd_magic, OBD_DEVICE_MAGIC);
350                                 LASSERTF(result->obd_minor == new_obd_minor,
351                                          "%p obd_minor %d != %d\n", result,
352                                          result->obd_minor, new_obd_minor);
353
354                                 obd_devs[result->obd_minor] = NULL;
355                                 result->obd_name[0]='\0';
356                          }
357                         result = ERR_PTR(-EEXIST);
358                         break;
359                 }
360                 if (!result && !obd) {
361                         result = newdev;
362                         result->obd_minor = i;
363                         new_obd_minor = i;
364                         result->obd_type = type;
365                         strncpy(result->obd_name, name,
366                                 sizeof(result->obd_name) - 1);
367                         obd_devs[i] = result;
368                 }
369         }
370         write_unlock(&obd_dev_lock);
371
372         if (result == NULL && i >= class_devno_max()) {
373                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
374                        class_devno_max());
375                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
376         }
377
378         if (IS_ERR(result))
379                 GOTO(out, result);
380
381         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
382                result->obd_name, result);
383
384         RETURN(result);
385 out:
386         obd_device_free(newdev);
387 out_type:
388         class_put_type(type);
389         return result;
390 }
391
392 void class_release_dev(struct obd_device *obd)
393 {
394         struct obd_type *obd_type = obd->obd_type;
395
396         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
397                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
398         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
399                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
400         LASSERT(obd_type != NULL);
401
402         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
403                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
404
405         write_lock(&obd_dev_lock);
406         obd_devs[obd->obd_minor] = NULL;
407         write_unlock(&obd_dev_lock);
408         obd_device_free(obd);
409
410         class_put_type(obd_type);
411 }
412
413 int class_name2dev(const char *name)
414 {
415         int i;
416
417         if (!name)
418                 return -1;
419
420         read_lock(&obd_dev_lock);
421         for (i = 0; i < class_devno_max(); i++) {
422                 struct obd_device *obd = class_num2obd(i);
423
424                 if (obd && strcmp(name, obd->obd_name) == 0) {
425                         /* Make sure we finished attaching before we give
426                            out any references */
427                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428                         if (obd->obd_attached) {
429                                 read_unlock(&obd_dev_lock);
430                                 return i;
431                         }
432                         break;
433                 }
434         }
435         read_unlock(&obd_dev_lock);
436
437         return -1;
438 }
439 EXPORT_SYMBOL(class_name2dev);
440
441 struct obd_device *class_name2obd(const char *name)
442 {
443         int dev = class_name2dev(name);
444
445         if (dev < 0 || dev > class_devno_max())
446                 return NULL;
447         return class_num2obd(dev);
448 }
449 EXPORT_SYMBOL(class_name2obd);
450
451 int class_uuid2dev(struct obd_uuid *uuid)
452 {
453         int i;
454
455         read_lock(&obd_dev_lock);
456         for (i = 0; i < class_devno_max(); i++) {
457                 struct obd_device *obd = class_num2obd(i);
458
459                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
460                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
461                         read_unlock(&obd_dev_lock);
462                         return i;
463                 }
464         }
465         read_unlock(&obd_dev_lock);
466
467         return -1;
468 }
469 EXPORT_SYMBOL(class_uuid2dev);
470
471 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
472 {
473         int dev = class_uuid2dev(uuid);
474         if (dev < 0)
475                 return NULL;
476         return class_num2obd(dev);
477 }
478 EXPORT_SYMBOL(class_uuid2obd);
479
480 /**
481  * Get obd device from ::obd_devs[]
482  *
483  * \param num [in] array index
484  *
485  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
486  *         otherwise return the obd device there.
487  */
488 struct obd_device *class_num2obd(int num)
489 {
490         struct obd_device *obd = NULL;
491
492         if (num < class_devno_max()) {
493                 obd = obd_devs[num];
494                 if (obd == NULL)
495                         return NULL;
496
497                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
498                          "%p obd_magic %08x != %08x\n",
499                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
500                 LASSERTF(obd->obd_minor == num,
501                          "%p obd_minor %0d != %0d\n",
502                          obd, obd->obd_minor, num);
503         }
504
505         return obd;
506 }
507 EXPORT_SYMBOL(class_num2obd);
508
509 /**
510  * Get obd devices count. Device in any
511  *    state are counted
512  * \retval obd device count
513  */
514 int get_devices_count(void)
515 {
516         int index, max_index = class_devno_max(), dev_count = 0;
517
518         read_lock(&obd_dev_lock);
519         for (index = 0; index <= max_index; index++) {
520                 struct obd_device *obd = class_num2obd(index);
521                 if (obd != NULL)
522                         dev_count++;
523         }
524         read_unlock(&obd_dev_lock);
525
526         return dev_count;
527 }
528 EXPORT_SYMBOL(get_devices_count);
529
530 void class_obd_list(void)
531 {
532         char *status;
533         int i;
534
535         read_lock(&obd_dev_lock);
536         for (i = 0; i < class_devno_max(); i++) {
537                 struct obd_device *obd = class_num2obd(i);
538
539                 if (obd == NULL)
540                         continue;
541                 if (obd->obd_stopping)
542                         status = "ST";
543                 else if (obd->obd_set_up)
544                         status = "UP";
545                 else if (obd->obd_attached)
546                         status = "AT";
547                 else
548                         status = "--";
549                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
550                          i, status, obd->obd_type->typ_name,
551                          obd->obd_name, obd->obd_uuid.uuid,
552                          atomic_read(&obd->obd_refcount));
553         }
554         read_unlock(&obd_dev_lock);
555         return;
556 }
557
558 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
559    specified, then only the client with that uuid is returned,
560    otherwise any client connected to the tgt is returned. */
561 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
562                                           const char * typ_name,
563                                           struct obd_uuid *grp_uuid)
564 {
565         int i;
566
567         read_lock(&obd_dev_lock);
568         for (i = 0; i < class_devno_max(); i++) {
569                 struct obd_device *obd = class_num2obd(i);
570
571                 if (obd == NULL)
572                         continue;
573                 if ((strncmp(obd->obd_type->typ_name, typ_name,
574                              strlen(typ_name)) == 0)) {
575                         if (obd_uuid_equals(tgt_uuid,
576                                             &obd->u.cli.cl_target_uuid) &&
577                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
578                                                          &obd->obd_uuid) : 1)) {
579                                 read_unlock(&obd_dev_lock);
580                                 return obd;
581                         }
582                 }
583         }
584         read_unlock(&obd_dev_lock);
585
586         return NULL;
587 }
588 EXPORT_SYMBOL(class_find_client_obd);
589
590 /* Iterate the obd_device list looking devices have grp_uuid. Start
591    searching at *next, and if a device is found, the next index to look
592    at is saved in *next. If next is NULL, then the first matching device
593    will always be returned. */
594 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
595 {
596         int i;
597
598         if (next == NULL)
599                 i = 0;
600         else if (*next >= 0 && *next < class_devno_max())
601                 i = *next;
602         else
603                 return NULL;
604
605         read_lock(&obd_dev_lock);
606         for (; i < class_devno_max(); i++) {
607                 struct obd_device *obd = class_num2obd(i);
608
609                 if (obd == NULL)
610                         continue;
611                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
612                         if (next != NULL)
613                                 *next = i+1;
614                         read_unlock(&obd_dev_lock);
615                         return obd;
616                 }
617         }
618         read_unlock(&obd_dev_lock);
619
620         return NULL;
621 }
622 EXPORT_SYMBOL(class_devices_in_group);
623
624 /**
625  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
626  * adjust sptlrpc settings accordingly.
627  */
628 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
629 {
630         struct obd_device  *obd;
631         const char         *type;
632         int                 i, rc = 0, rc2;
633
634         LASSERT(namelen > 0);
635
636         read_lock(&obd_dev_lock);
637         for (i = 0; i < class_devno_max(); i++) {
638                 obd = class_num2obd(i);
639
640                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
641                         continue;
642
643                 /* only notify mdc, osc, mdt, ost */
644                 type = obd->obd_type->typ_name;
645                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
646                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
647                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
648                     strcmp(type, LUSTRE_OST_NAME) != 0)
649                         continue;
650
651                 if (strncmp(obd->obd_name, fsname, namelen))
652                         continue;
653
654                 class_incref(obd, __FUNCTION__, obd);
655                 read_unlock(&obd_dev_lock);
656                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
657                                          sizeof(KEY_SPTLRPC_CONF),
658                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
659                 rc = rc ? rc : rc2;
660                 class_decref(obd, __FUNCTION__, obd);
661                 read_lock(&obd_dev_lock);
662         }
663         read_unlock(&obd_dev_lock);
664         return rc;
665 }
666 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
667
668 void obd_cleanup_caches(void)
669 {
670         ENTRY;
671         if (obd_device_cachep) {
672                 kmem_cache_destroy(obd_device_cachep);
673                 obd_device_cachep = NULL;
674         }
675         if (obdo_cachep) {
676                 kmem_cache_destroy(obdo_cachep);
677                 obdo_cachep = NULL;
678         }
679         if (import_cachep) {
680                 kmem_cache_destroy(import_cachep);
681                 import_cachep = NULL;
682         }
683         if (capa_cachep) {
684                 kmem_cache_destroy(capa_cachep);
685                 capa_cachep = NULL;
686         }
687         EXIT;
688 }
689
690 int obd_init_caches(void)
691 {
692         int rc;
693         ENTRY;
694
695         LASSERT(obd_device_cachep == NULL);
696         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
697                                               sizeof(struct obd_device),
698                                               0, 0, NULL);
699         if (!obd_device_cachep)
700                 GOTO(out, rc = -ENOMEM);
701
702         LASSERT(obdo_cachep == NULL);
703         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
704                                         0, 0, NULL);
705         if (!obdo_cachep)
706                 GOTO(out, rc = -ENOMEM);
707
708         LASSERT(import_cachep == NULL);
709         import_cachep = kmem_cache_create("ll_import_cache",
710                                           sizeof(struct obd_import),
711                                           0, 0, NULL);
712         if (!import_cachep)
713                 GOTO(out, rc = -ENOMEM);
714
715         LASSERT(capa_cachep == NULL);
716         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
717                                         0, 0, NULL);
718         if (!capa_cachep)
719                 GOTO(out, rc = -ENOMEM);
720
721         RETURN(0);
722 out:
723         obd_cleanup_caches();
724         RETURN(rc);
725 }
726
727 /* map connection to client */
728 struct obd_export *class_conn2export(struct lustre_handle *conn)
729 {
730         struct obd_export *export;
731         ENTRY;
732
733         if (!conn) {
734                 CDEBUG(D_CACHE, "looking for null handle\n");
735                 RETURN(NULL);
736         }
737
738         if (conn->cookie == -1) {  /* this means assign a new connection */
739                 CDEBUG(D_CACHE, "want a new connection\n");
740                 RETURN(NULL);
741         }
742
743         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
744         export = class_handle2object(conn->cookie, NULL);
745         RETURN(export);
746 }
747 EXPORT_SYMBOL(class_conn2export);
748
749 struct obd_device *class_exp2obd(struct obd_export *exp)
750 {
751         if (exp)
752                 return exp->exp_obd;
753         return NULL;
754 }
755 EXPORT_SYMBOL(class_exp2obd);
756
757 struct obd_device *class_conn2obd(struct lustre_handle *conn)
758 {
759         struct obd_export *export;
760         export = class_conn2export(conn);
761         if (export) {
762                 struct obd_device *obd = export->exp_obd;
763                 class_export_put(export);
764                 return obd;
765         }
766         return NULL;
767 }
768 EXPORT_SYMBOL(class_conn2obd);
769
770 struct obd_import *class_exp2cliimp(struct obd_export *exp)
771 {
772         struct obd_device *obd = exp->exp_obd;
773         if (obd == NULL)
774                 return NULL;
775         return obd->u.cli.cl_import;
776 }
777 EXPORT_SYMBOL(class_exp2cliimp);
778
779 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
780 {
781         struct obd_device *obd = class_conn2obd(conn);
782         if (obd == NULL)
783                 return NULL;
784         return obd->u.cli.cl_import;
785 }
786 EXPORT_SYMBOL(class_conn2cliimp);
787
788 /* Export management functions */
789 static void class_export_destroy(struct obd_export *exp)
790 {
791         struct obd_device *obd = exp->exp_obd;
792         ENTRY;
793
794         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
795         LASSERT(obd != NULL);
796
797         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
798                exp->exp_client_uuid.uuid, obd->obd_name);
799
800         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
801         if (exp->exp_connection)
802                 ptlrpc_put_connection_superhack(exp->exp_connection);
803
804         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
805         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
806         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
807         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
808         obd_destroy_export(exp);
809         class_decref(obd, "export", exp);
810
811         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
812         EXIT;
813 }
814
815 static void export_handle_addref(void *export)
816 {
817         class_export_get(export);
818 }
819
820 static struct portals_handle_ops export_handle_ops = {
821         .hop_addref = export_handle_addref,
822         .hop_free   = NULL,
823 };
824
825 struct obd_export *class_export_get(struct obd_export *exp)
826 {
827         atomic_inc(&exp->exp_refcount);
828         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
829                atomic_read(&exp->exp_refcount));
830         return exp;
831 }
832 EXPORT_SYMBOL(class_export_get);
833
834 void class_export_put(struct obd_export *exp)
835 {
836         LASSERT(exp != NULL);
837         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
838         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
839                atomic_read(&exp->exp_refcount) - 1);
840
841         if (atomic_dec_and_test(&exp->exp_refcount)) {
842                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
843                 CDEBUG(D_IOCTL, "final put %p/%s\n",
844                        exp, exp->exp_client_uuid.uuid);
845
846                 /* release nid stat refererence */
847                 lprocfs_exp_cleanup(exp);
848
849                 obd_zombie_export_add(exp);
850         }
851 }
852 EXPORT_SYMBOL(class_export_put);
853
854 /* Creates a new export, adds it to the hash table, and returns a
855  * pointer to it. The refcount is 2: one for the hash reference, and
856  * one for the pointer returned by this function. */
857 struct obd_export *class_new_export(struct obd_device *obd,
858                                     struct obd_uuid *cluuid)
859 {
860         struct obd_export *export;
861         cfs_hash_t *hash = NULL;
862         int rc = 0;
863         ENTRY;
864
865         OBD_ALLOC_PTR(export);
866         if (!export)
867                 return ERR_PTR(-ENOMEM);
868
869         export->exp_conn_cnt = 0;
870         export->exp_lock_hash = NULL;
871         export->exp_flock_hash = NULL;
872         atomic_set(&export->exp_refcount, 2);
873         atomic_set(&export->exp_rpc_count, 0);
874         atomic_set(&export->exp_cb_count, 0);
875         atomic_set(&export->exp_locks_count, 0);
876 #if LUSTRE_TRACKS_LOCK_EXP_REFS
877         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
878         spin_lock_init(&export->exp_locks_list_guard);
879 #endif
880         atomic_set(&export->exp_replay_count, 0);
881         export->exp_obd = obd;
882         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
883         spin_lock_init(&export->exp_uncommitted_replies_lock);
884         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
885         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
886         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
887         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
888         CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
889         class_handle_hash(&export->exp_handle, &export_handle_ops);
890         export->exp_last_request_time = cfs_time_current_sec();
891         spin_lock_init(&export->exp_lock);
892         spin_lock_init(&export->exp_rpc_lock);
893         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
894         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
895         spin_lock_init(&export->exp_bl_list_lock);
896         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
897
898         export->exp_sp_peer = LUSTRE_SP_ANY;
899         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
900         export->exp_client_uuid = *cluuid;
901         obd_init_export(export);
902
903         spin_lock(&obd->obd_dev_lock);
904         /* shouldn't happen, but might race */
905         if (obd->obd_stopping)
906                 GOTO(exit_unlock, rc = -ENODEV);
907
908         hash = cfs_hash_getref(obd->obd_uuid_hash);
909         if (hash == NULL)
910                 GOTO(exit_unlock, rc = -ENODEV);
911         spin_unlock(&obd->obd_dev_lock);
912
913         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
914                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
915                 if (rc != 0) {
916                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
917                                       obd->obd_name, cluuid->uuid, rc);
918                         GOTO(exit_err, rc = -EALREADY);
919                 }
920         }
921
922         spin_lock(&obd->obd_dev_lock);
923         if (obd->obd_stopping) {
924                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
925                 GOTO(exit_unlock, rc = -ENODEV);
926         }
927
928         class_incref(obd, "export", export);
929         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
930         cfs_list_add_tail(&export->exp_obd_chain_timed,
931                           &export->exp_obd->obd_exports_timed);
932         export->exp_obd->obd_num_exports++;
933         spin_unlock(&obd->obd_dev_lock);
934         cfs_hash_putref(hash);
935         RETURN(export);
936
937 exit_unlock:
938         spin_unlock(&obd->obd_dev_lock);
939 exit_err:
940         if (hash)
941                 cfs_hash_putref(hash);
942         class_handle_unhash(&export->exp_handle);
943         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
944         obd_destroy_export(export);
945         OBD_FREE_PTR(export);
946         return ERR_PTR(rc);
947 }
948 EXPORT_SYMBOL(class_new_export);
949
950 void class_unlink_export(struct obd_export *exp)
951 {
952         class_handle_unhash(&exp->exp_handle);
953
954         spin_lock(&exp->exp_obd->obd_dev_lock);
955         /* delete an uuid-export hashitem from hashtables */
956         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
957                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
958                              &exp->exp_client_uuid,
959                              &exp->exp_uuid_hash);
960
961         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
962         cfs_list_del_init(&exp->exp_obd_chain_timed);
963         exp->exp_obd->obd_num_exports--;
964         spin_unlock(&exp->exp_obd->obd_dev_lock);
965         class_export_put(exp);
966 }
967 EXPORT_SYMBOL(class_unlink_export);
968
969 /* Import management functions */
970 void class_import_destroy(struct obd_import *imp)
971 {
972         ENTRY;
973
974         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
975                 imp->imp_obd->obd_name);
976
977         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
978
979         ptlrpc_put_connection_superhack(imp->imp_connection);
980
981         while (!cfs_list_empty(&imp->imp_conn_list)) {
982                 struct obd_import_conn *imp_conn;
983
984                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
985                                           struct obd_import_conn, oic_item);
986                 cfs_list_del_init(&imp_conn->oic_item);
987                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
988                 OBD_FREE(imp_conn, sizeof(*imp_conn));
989         }
990
991         LASSERT(imp->imp_sec == NULL);
992         class_decref(imp->imp_obd, "import", imp);
993         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
994         EXIT;
995 }
996
997 static void import_handle_addref(void *import)
998 {
999         class_import_get(import);
1000 }
1001
1002 static struct portals_handle_ops import_handle_ops = {
1003         .hop_addref = import_handle_addref,
1004         .hop_free   = NULL,
1005 };
1006
1007 struct obd_import *class_import_get(struct obd_import *import)
1008 {
1009         atomic_inc(&import->imp_refcount);
1010         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1011                atomic_read(&import->imp_refcount),
1012                import->imp_obd->obd_name);
1013         return import;
1014 }
1015 EXPORT_SYMBOL(class_import_get);
1016
1017 void class_import_put(struct obd_import *imp)
1018 {
1019         ENTRY;
1020
1021         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1022         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1023
1024         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1025                atomic_read(&imp->imp_refcount) - 1,
1026                imp->imp_obd->obd_name);
1027
1028         if (atomic_dec_and_test(&imp->imp_refcount)) {
1029                 CDEBUG(D_INFO, "final put import %p\n", imp);
1030                 obd_zombie_import_add(imp);
1031         }
1032
1033         /* catch possible import put race */
1034         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1035         EXIT;
1036 }
1037 EXPORT_SYMBOL(class_import_put);
1038
1039 static void init_imp_at(struct imp_at *at) {
1040         int i;
1041         at_init(&at->iat_net_latency, 0, 0);
1042         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1043                 /* max service estimates are tracked on the server side, so
1044                    don't use the AT history here, just use the last reported
1045                    val. (But keep hist for proc histogram, worst_ever) */
1046                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1047                         AT_FLG_NOHIST);
1048         }
1049 }
1050
1051 struct obd_import *class_new_import(struct obd_device *obd)
1052 {
1053         struct obd_import *imp;
1054
1055         OBD_ALLOC(imp, sizeof(*imp));
1056         if (imp == NULL)
1057                 return NULL;
1058
1059         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1060         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1061         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1062         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1063         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1064         CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1065         imp->imp_replay_cursor = &imp->imp_committed_list;
1066         spin_lock_init(&imp->imp_lock);
1067         imp->imp_last_success_conn = 0;
1068         imp->imp_state = LUSTRE_IMP_NEW;
1069         imp->imp_obd = class_incref(obd, "import", imp);
1070         mutex_init(&imp->imp_sec_mutex);
1071         init_waitqueue_head(&imp->imp_recovery_waitq);
1072
1073         atomic_set(&imp->imp_refcount, 2);
1074         atomic_set(&imp->imp_unregistering, 0);
1075         atomic_set(&imp->imp_inflight, 0);
1076         atomic_set(&imp->imp_replay_inflight, 0);
1077         atomic_set(&imp->imp_inval_count, 0);
1078         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1079         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1080         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1081         init_imp_at(&imp->imp_at);
1082
1083         /* the default magic is V2, will be used in connect RPC, and
1084          * then adjusted according to the flags in request/reply. */
1085         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1086
1087         return imp;
1088 }
1089 EXPORT_SYMBOL(class_new_import);
1090
1091 void class_destroy_import(struct obd_import *import)
1092 {
1093         LASSERT(import != NULL);
1094         LASSERT(import != LP_POISON);
1095
1096         class_handle_unhash(&import->imp_handle);
1097
1098         spin_lock(&import->imp_lock);
1099         import->imp_generation++;
1100         spin_unlock(&import->imp_lock);
1101         class_import_put(import);
1102 }
1103 EXPORT_SYMBOL(class_destroy_import);
1104
1105 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1106
1107 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1108 {
1109         spin_lock(&exp->exp_locks_list_guard);
1110
1111         LASSERT(lock->l_exp_refs_nr >= 0);
1112
1113         if (lock->l_exp_refs_target != NULL &&
1114             lock->l_exp_refs_target != exp) {
1115                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1116                               exp, lock, lock->l_exp_refs_target);
1117         }
1118         if ((lock->l_exp_refs_nr ++) == 0) {
1119                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1120                 lock->l_exp_refs_target = exp;
1121         }
1122         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1123                lock, exp, lock->l_exp_refs_nr);
1124         spin_unlock(&exp->exp_locks_list_guard);
1125 }
1126 EXPORT_SYMBOL(__class_export_add_lock_ref);
1127
1128 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1129 {
1130         spin_lock(&exp->exp_locks_list_guard);
1131         LASSERT(lock->l_exp_refs_nr > 0);
1132         if (lock->l_exp_refs_target != exp) {
1133                 LCONSOLE_WARN("lock %p, "
1134                               "mismatching export pointers: %p, %p\n",
1135                               lock, lock->l_exp_refs_target, exp);
1136         }
1137         if (-- lock->l_exp_refs_nr == 0) {
1138                 cfs_list_del_init(&lock->l_exp_refs_link);
1139                 lock->l_exp_refs_target = NULL;
1140         }
1141         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1142                lock, exp, lock->l_exp_refs_nr);
1143         spin_unlock(&exp->exp_locks_list_guard);
1144 }
1145 EXPORT_SYMBOL(__class_export_del_lock_ref);
1146 #endif
1147
1148 /* A connection defines an export context in which preallocation can
1149    be managed. This releases the export pointer reference, and returns
1150    the export handle, so the export refcount is 1 when this function
1151    returns. */
1152 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1153                   struct obd_uuid *cluuid)
1154 {
1155         struct obd_export *export;
1156         LASSERT(conn != NULL);
1157         LASSERT(obd != NULL);
1158         LASSERT(cluuid != NULL);
1159         ENTRY;
1160
1161         export = class_new_export(obd, cluuid);
1162         if (IS_ERR(export))
1163                 RETURN(PTR_ERR(export));
1164
1165         conn->cookie = export->exp_handle.h_cookie;
1166         class_export_put(export);
1167
1168         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1169                cluuid->uuid, conn->cookie);
1170         RETURN(0);
1171 }
1172 EXPORT_SYMBOL(class_connect);
1173
1174 /* if export is involved in recovery then clean up related things */
1175 void class_export_recovery_cleanup(struct obd_export *exp)
1176 {
1177         struct obd_device *obd = exp->exp_obd;
1178
1179         spin_lock(&obd->obd_recovery_task_lock);
1180         if (exp->exp_delayed)
1181                 obd->obd_delayed_clients--;
1182         if (obd->obd_recovering) {
1183                 if (exp->exp_in_recovery) {
1184                         spin_lock(&exp->exp_lock);
1185                         exp->exp_in_recovery = 0;
1186                         spin_unlock(&exp->exp_lock);
1187                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1188                         atomic_dec(&obd->obd_connected_clients);
1189                 }
1190
1191                 /* if called during recovery then should update
1192                  * obd_stale_clients counter,
1193                  * lightweight exports are not counted */
1194                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1195                         exp->exp_obd->obd_stale_clients++;
1196         }
1197         spin_unlock(&obd->obd_recovery_task_lock);
1198         /** Cleanup req replay fields */
1199         if (exp->exp_req_replay_needed) {
1200                 spin_lock(&exp->exp_lock);
1201                 exp->exp_req_replay_needed = 0;
1202                 spin_unlock(&exp->exp_lock);
1203                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1204                 atomic_dec(&obd->obd_req_replay_clients);
1205         }
1206         /** Cleanup lock replay data */
1207         if (exp->exp_lock_replay_needed) {
1208                 spin_lock(&exp->exp_lock);
1209                 exp->exp_lock_replay_needed = 0;
1210                 spin_unlock(&exp->exp_lock);
1211                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1212                 atomic_dec(&obd->obd_lock_replay_clients);
1213         }
1214 }
1215
1216 /* This function removes 1-3 references from the export:
1217  * 1 - for export pointer passed
1218  * and if disconnect really need
1219  * 2 - removing from hash
1220  * 3 - in client_unlink_export
1221  * The export pointer passed to this function can destroyed */
1222 int class_disconnect(struct obd_export *export)
1223 {
1224         int already_disconnected;
1225         ENTRY;
1226
1227         if (export == NULL) {
1228                 CWARN("attempting to free NULL export %p\n", export);
1229                 RETURN(-EINVAL);
1230         }
1231
1232         spin_lock(&export->exp_lock);
1233         already_disconnected = export->exp_disconnected;
1234         export->exp_disconnected = 1;
1235         spin_unlock(&export->exp_lock);
1236
1237         /* class_cleanup(), abort_recovery(), and class_fail_export()
1238          * all end up in here, and if any of them race we shouldn't
1239          * call extra class_export_puts(). */
1240         if (already_disconnected) {
1241                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1242                 GOTO(no_disconn, already_disconnected);
1243         }
1244
1245         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1246                export->exp_handle.h_cookie);
1247
1248         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1249                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1250                              &export->exp_connection->c_peer.nid,
1251                              &export->exp_nid_hash);
1252
1253         class_export_recovery_cleanup(export);
1254         class_unlink_export(export);
1255 no_disconn:
1256         class_export_put(export);
1257         RETURN(0);
1258 }
1259 EXPORT_SYMBOL(class_disconnect);
1260
1261 /* Return non-zero for a fully connected export */
1262 int class_connected_export(struct obd_export *exp)
1263 {
1264         int connected = 0;
1265
1266         if (exp) {
1267                 spin_lock(&exp->exp_lock);
1268                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1269                 spin_unlock(&exp->exp_lock);
1270         }
1271         return connected;
1272 }
1273 EXPORT_SYMBOL(class_connected_export);
1274
1275 static void class_disconnect_export_list(cfs_list_t *list,
1276                                          enum obd_option flags)
1277 {
1278         int rc;
1279         struct obd_export *exp;
1280         ENTRY;
1281
1282         /* It's possible that an export may disconnect itself, but
1283          * nothing else will be added to this list. */
1284         while (!cfs_list_empty(list)) {
1285                 exp = cfs_list_entry(list->next, struct obd_export,
1286                                      exp_obd_chain);
1287                 /* need for safe call CDEBUG after obd_disconnect */
1288                 class_export_get(exp);
1289
1290                 spin_lock(&exp->exp_lock);
1291                 exp->exp_flags = flags;
1292                 spin_unlock(&exp->exp_lock);
1293
1294                 if (obd_uuid_equals(&exp->exp_client_uuid,
1295                                     &exp->exp_obd->obd_uuid)) {
1296                         CDEBUG(D_HA,
1297                                "exp %p export uuid == obd uuid, don't discon\n",
1298                                exp);
1299                         /* Need to delete this now so we don't end up pointing
1300                          * to work_list later when this export is cleaned up. */
1301                         cfs_list_del_init(&exp->exp_obd_chain);
1302                         class_export_put(exp);
1303                         continue;
1304                 }
1305
1306                 class_export_get(exp);
1307                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1308                        "last request at "CFS_TIME_T"\n",
1309                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1310                        exp, exp->exp_last_request_time);
1311                 /* release one export reference anyway */
1312                 rc = obd_disconnect(exp);
1313
1314                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1315                        obd_export_nid2str(exp), exp, rc);
1316                 class_export_put(exp);
1317         }
1318         EXIT;
1319 }
1320
1321 void class_disconnect_exports(struct obd_device *obd)
1322 {
1323         cfs_list_t work_list;
1324         ENTRY;
1325
1326         /* Move all of the exports from obd_exports to a work list, en masse. */
1327         CFS_INIT_LIST_HEAD(&work_list);
1328         spin_lock(&obd->obd_dev_lock);
1329         cfs_list_splice_init(&obd->obd_exports, &work_list);
1330         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1331         spin_unlock(&obd->obd_dev_lock);
1332
1333         if (!cfs_list_empty(&work_list)) {
1334                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1335                        "disconnecting them\n", obd->obd_minor, obd);
1336                 class_disconnect_export_list(&work_list,
1337                                              exp_flags_from_obd(obd));
1338         } else
1339                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1340                        obd->obd_minor, obd);
1341         EXIT;
1342 }
1343 EXPORT_SYMBOL(class_disconnect_exports);
1344
1345 /* Remove exports that have not completed recovery.
1346  */
1347 void class_disconnect_stale_exports(struct obd_device *obd,
1348                                     int (*test_export)(struct obd_export *))
1349 {
1350         cfs_list_t work_list;
1351         struct obd_export *exp, *n;
1352         int evicted = 0;
1353         ENTRY;
1354
1355         CFS_INIT_LIST_HEAD(&work_list);
1356         spin_lock(&obd->obd_dev_lock);
1357         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1358                                      exp_obd_chain) {
1359                 /* don't count self-export as client */
1360                 if (obd_uuid_equals(&exp->exp_client_uuid,
1361                                     &exp->exp_obd->obd_uuid))
1362                         continue;
1363
1364                 /* don't evict clients which have no slot in last_rcvd
1365                  * (e.g. lightweight connection) */
1366                 if (exp->exp_target_data.ted_lr_idx == -1)
1367                         continue;
1368
1369                 spin_lock(&exp->exp_lock);
1370                 if (exp->exp_failed || test_export(exp)) {
1371                         spin_unlock(&exp->exp_lock);
1372                         continue;
1373                 }
1374                 exp->exp_failed = 1;
1375                 spin_unlock(&exp->exp_lock);
1376
1377                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1378                 evicted++;
1379                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1380                        obd->obd_name, exp->exp_client_uuid.uuid,
1381                        exp->exp_connection == NULL ? "<unknown>" :
1382                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1383                 print_export_data(exp, "EVICTING", 0);
1384         }
1385         spin_unlock(&obd->obd_dev_lock);
1386
1387         if (evicted)
1388                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1389                               obd->obd_name, evicted);
1390
1391         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1392                                                  OBD_OPT_ABORT_RECOV);
1393         EXIT;
1394 }
1395 EXPORT_SYMBOL(class_disconnect_stale_exports);
1396
1397 void class_fail_export(struct obd_export *exp)
1398 {
1399         int rc, already_failed;
1400
1401         spin_lock(&exp->exp_lock);
1402         already_failed = exp->exp_failed;
1403         exp->exp_failed = 1;
1404         spin_unlock(&exp->exp_lock);
1405
1406         if (already_failed) {
1407                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1408                        exp, exp->exp_client_uuid.uuid);
1409                 return;
1410         }
1411
1412         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1413                exp, exp->exp_client_uuid.uuid);
1414
1415         if (obd_dump_on_timeout)
1416                 libcfs_debug_dumplog();
1417
1418         /* need for safe call CDEBUG after obd_disconnect */
1419         class_export_get(exp);
1420
1421         /* Most callers into obd_disconnect are removing their own reference
1422          * (request, for example) in addition to the one from the hash table.
1423          * We don't have such a reference here, so make one. */
1424         class_export_get(exp);
1425         rc = obd_disconnect(exp);
1426         if (rc)
1427                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1428         else
1429                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1430                        exp, exp->exp_client_uuid.uuid);
1431         class_export_put(exp);
1432 }
1433 EXPORT_SYMBOL(class_fail_export);
1434
1435 char *obd_export_nid2str(struct obd_export *exp)
1436 {
1437         if (exp->exp_connection != NULL)
1438                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1439
1440         return "(no nid)";
1441 }
1442 EXPORT_SYMBOL(obd_export_nid2str);
1443
1444 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1445 {
1446         cfs_hash_t *nid_hash;
1447         struct obd_export *doomed_exp = NULL;
1448         int exports_evicted = 0;
1449
1450         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1451
1452         spin_lock(&obd->obd_dev_lock);
1453         /* umount has run already, so evict thread should leave
1454          * its task to umount thread now */
1455         if (obd->obd_stopping) {
1456                 spin_unlock(&obd->obd_dev_lock);
1457                 return exports_evicted;
1458         }
1459         nid_hash = obd->obd_nid_hash;
1460         cfs_hash_getref(nid_hash);
1461         spin_unlock(&obd->obd_dev_lock);
1462
1463         do {
1464                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1465                 if (doomed_exp == NULL)
1466                         break;
1467
1468                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1469                          "nid %s found, wanted nid %s, requested nid %s\n",
1470                          obd_export_nid2str(doomed_exp),
1471                          libcfs_nid2str(nid_key), nid);
1472                 LASSERTF(doomed_exp != obd->obd_self_export,
1473                          "self-export is hashed by NID?\n");
1474                 exports_evicted++;
1475                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1476                               "request\n", obd->obd_name,
1477                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1478                               obd_export_nid2str(doomed_exp));
1479                 class_fail_export(doomed_exp);
1480                 class_export_put(doomed_exp);
1481         } while (1);
1482
1483         cfs_hash_putref(nid_hash);
1484
1485         if (!exports_evicted)
1486                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1487                        obd->obd_name, nid);
1488         return exports_evicted;
1489 }
1490 EXPORT_SYMBOL(obd_export_evict_by_nid);
1491
1492 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1493 {
1494         cfs_hash_t *uuid_hash;
1495         struct obd_export *doomed_exp = NULL;
1496         struct obd_uuid doomed_uuid;
1497         int exports_evicted = 0;
1498
1499         spin_lock(&obd->obd_dev_lock);
1500         if (obd->obd_stopping) {
1501                 spin_unlock(&obd->obd_dev_lock);
1502                 return exports_evicted;
1503         }
1504         uuid_hash = obd->obd_uuid_hash;
1505         cfs_hash_getref(uuid_hash);
1506         spin_unlock(&obd->obd_dev_lock);
1507
1508         obd_str2uuid(&doomed_uuid, uuid);
1509         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1510                 CERROR("%s: can't evict myself\n", obd->obd_name);
1511                 cfs_hash_putref(uuid_hash);
1512                 return exports_evicted;
1513         }
1514
1515         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1516
1517         if (doomed_exp == NULL) {
1518                 CERROR("%s: can't disconnect %s: no exports found\n",
1519                        obd->obd_name, uuid);
1520         } else {
1521                 CWARN("%s: evicting %s at adminstrative request\n",
1522                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1523                 class_fail_export(doomed_exp);
1524                 class_export_put(doomed_exp);
1525                 exports_evicted++;
1526         }
1527         cfs_hash_putref(uuid_hash);
1528
1529         return exports_evicted;
1530 }
1531 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1532
1533 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1534 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1535 EXPORT_SYMBOL(class_export_dump_hook);
1536 #endif
1537
1538 static void print_export_data(struct obd_export *exp, const char *status,
1539                               int locks)
1540 {
1541         struct ptlrpc_reply_state *rs;
1542         struct ptlrpc_reply_state *first_reply = NULL;
1543         int nreplies = 0;
1544
1545         spin_lock(&exp->exp_lock);
1546         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1547                                 rs_exp_list) {
1548                 if (nreplies == 0)
1549                         first_reply = rs;
1550                 nreplies++;
1551         }
1552         spin_unlock(&exp->exp_lock);
1553
1554         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1555                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1556                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1557                atomic_read(&exp->exp_rpc_count),
1558                atomic_read(&exp->exp_cb_count),
1559                atomic_read(&exp->exp_locks_count),
1560                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1561                nreplies, first_reply, nreplies > 3 ? "..." : "",
1562                exp->exp_last_committed);
1563 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1564         if (locks && class_export_dump_hook != NULL)
1565                 class_export_dump_hook(exp);
1566 #endif
1567 }
1568
1569 void dump_exports(struct obd_device *obd, int locks)
1570 {
1571         struct obd_export *exp;
1572
1573         spin_lock(&obd->obd_dev_lock);
1574         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1575                 print_export_data(exp, "ACTIVE", locks);
1576         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1577                 print_export_data(exp, "UNLINKED", locks);
1578         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1579                 print_export_data(exp, "DELAYED", locks);
1580         spin_unlock(&obd->obd_dev_lock);
1581         spin_lock(&obd_zombie_impexp_lock);
1582         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1583                 print_export_data(exp, "ZOMBIE", locks);
1584         spin_unlock(&obd_zombie_impexp_lock);
1585 }
1586 EXPORT_SYMBOL(dump_exports);
1587
1588 void obd_exports_barrier(struct obd_device *obd)
1589 {
1590         int waited = 2;
1591         LASSERT(cfs_list_empty(&obd->obd_exports));
1592         spin_lock(&obd->obd_dev_lock);
1593         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1594                 spin_unlock(&obd->obd_dev_lock);
1595                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1596                                                    cfs_time_seconds(waited));
1597                 if (waited > 5 && IS_PO2(waited)) {
1598                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1599                                       "more than %d seconds. "
1600                                       "The obd refcount = %d. Is it stuck?\n",
1601                                       obd->obd_name, waited,
1602                                       atomic_read(&obd->obd_refcount));
1603                         dump_exports(obd, 1);
1604                 }
1605                 waited *= 2;
1606                 spin_lock(&obd->obd_dev_lock);
1607         }
1608         spin_unlock(&obd->obd_dev_lock);
1609 }
1610 EXPORT_SYMBOL(obd_exports_barrier);
1611
1612 /* Total amount of zombies to be destroyed */
1613 static int zombies_count = 0;
1614
1615 /**
1616  * kill zombie imports and exports
1617  */
1618 void obd_zombie_impexp_cull(void)
1619 {
1620         struct obd_import *import;
1621         struct obd_export *export;
1622         ENTRY;
1623
1624         do {
1625                 spin_lock(&obd_zombie_impexp_lock);
1626
1627                 import = NULL;
1628                 if (!cfs_list_empty(&obd_zombie_imports)) {
1629                         import = cfs_list_entry(obd_zombie_imports.next,
1630                                                 struct obd_import,
1631                                                 imp_zombie_chain);
1632                         cfs_list_del_init(&import->imp_zombie_chain);
1633                 }
1634
1635                 export = NULL;
1636                 if (!cfs_list_empty(&obd_zombie_exports)) {
1637                         export = cfs_list_entry(obd_zombie_exports.next,
1638                                                 struct obd_export,
1639                                                 exp_obd_chain);
1640                         cfs_list_del_init(&export->exp_obd_chain);
1641                 }
1642
1643                 spin_unlock(&obd_zombie_impexp_lock);
1644
1645                 if (import != NULL) {
1646                         class_import_destroy(import);
1647                         spin_lock(&obd_zombie_impexp_lock);
1648                         zombies_count--;
1649                         spin_unlock(&obd_zombie_impexp_lock);
1650                 }
1651
1652                 if (export != NULL) {
1653                         class_export_destroy(export);
1654                         spin_lock(&obd_zombie_impexp_lock);
1655                         zombies_count--;
1656                         spin_unlock(&obd_zombie_impexp_lock);
1657                 }
1658
1659                 cond_resched();
1660         } while (import != NULL || export != NULL);
1661         EXIT;
1662 }
1663
1664 static struct completion        obd_zombie_start;
1665 static struct completion        obd_zombie_stop;
1666 static unsigned long            obd_zombie_flags;
1667 static wait_queue_head_t        obd_zombie_waitq;
1668 static pid_t                    obd_zombie_pid;
1669
1670 enum {
1671         OBD_ZOMBIE_STOP         = 0x0001,
1672 };
1673
1674 /**
1675  * check for work for kill zombie import/export thread.
1676  */
1677 static int obd_zombie_impexp_check(void *arg)
1678 {
1679         int rc;
1680
1681         spin_lock(&obd_zombie_impexp_lock);
1682         rc = (zombies_count == 0) &&
1683              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1684         spin_unlock(&obd_zombie_impexp_lock);
1685
1686         RETURN(rc);
1687 }
1688
1689 /**
1690  * Add export to the obd_zombe thread and notify it.
1691  */
1692 static void obd_zombie_export_add(struct obd_export *exp) {
1693         spin_lock(&exp->exp_obd->obd_dev_lock);
1694         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1695         cfs_list_del_init(&exp->exp_obd_chain);
1696         spin_unlock(&exp->exp_obd->obd_dev_lock);
1697         spin_lock(&obd_zombie_impexp_lock);
1698         zombies_count++;
1699         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1700         spin_unlock(&obd_zombie_impexp_lock);
1701
1702         obd_zombie_impexp_notify();
1703 }
1704
1705 /**
1706  * Add import to the obd_zombe thread and notify it.
1707  */
1708 static void obd_zombie_import_add(struct obd_import *imp) {
1709         LASSERT(imp->imp_sec == NULL);
1710         LASSERT(imp->imp_rq_pool == NULL);
1711         spin_lock(&obd_zombie_impexp_lock);
1712         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1713         zombies_count++;
1714         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1715         spin_unlock(&obd_zombie_impexp_lock);
1716
1717         obd_zombie_impexp_notify();
1718 }
1719
1720 /**
1721  * notify import/export destroy thread about new zombie.
1722  */
1723 static void obd_zombie_impexp_notify(void)
1724 {
1725         /*
1726          * Make sure obd_zomebie_impexp_thread get this notification.
1727          * It is possible this signal only get by obd_zombie_barrier, and
1728          * barrier gulps this notification and sleeps away and hangs ensues
1729          */
1730         wake_up_all(&obd_zombie_waitq);
1731 }
1732
1733 /**
1734  * check whether obd_zombie is idle
1735  */
1736 static int obd_zombie_is_idle(void)
1737 {
1738         int rc;
1739
1740         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1741         spin_lock(&obd_zombie_impexp_lock);
1742         rc = (zombies_count == 0);
1743         spin_unlock(&obd_zombie_impexp_lock);
1744         return rc;
1745 }
1746
1747 /**
1748  * wait when obd_zombie import/export queues become empty
1749  */
1750 void obd_zombie_barrier(void)
1751 {
1752         struct l_wait_info lwi = { 0 };
1753
1754         if (obd_zombie_pid == current_pid())
1755                 /* don't wait for myself */
1756                 return;
1757         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1758 }
1759 EXPORT_SYMBOL(obd_zombie_barrier);
1760
1761 #ifdef __KERNEL__
1762
1763 /**
1764  * destroy zombie export/import thread.
1765  */
1766 static int obd_zombie_impexp_thread(void *unused)
1767 {
1768         unshare_fs_struct();
1769         complete(&obd_zombie_start);
1770
1771         obd_zombie_pid = current_pid();
1772
1773         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1774                 struct l_wait_info lwi = { 0 };
1775
1776                 l_wait_event(obd_zombie_waitq,
1777                              !obd_zombie_impexp_check(NULL), &lwi);
1778                 obd_zombie_impexp_cull();
1779
1780                 /*
1781                  * Notify obd_zombie_barrier callers that queues
1782                  * may be empty.
1783                  */
1784                 wake_up(&obd_zombie_waitq);
1785         }
1786
1787         complete(&obd_zombie_stop);
1788
1789         RETURN(0);
1790 }
1791
1792 #else /* ! KERNEL */
1793
1794 static atomic_t zombie_recur = ATOMIC_INIT(0);
1795 static void *obd_zombie_impexp_work_cb;
1796 static void *obd_zombie_impexp_idle_cb;
1797
1798 int obd_zombie_impexp_kill(void *arg)
1799 {
1800         int rc = 0;
1801
1802         if (atomic_inc_return(&zombie_recur) == 1) {
1803                 obd_zombie_impexp_cull();
1804                 rc = 1;
1805         }
1806         atomic_dec(&zombie_recur);
1807         return rc;
1808 }
1809
1810 #endif
1811
1812 /**
1813  * start destroy zombie import/export thread
1814  */
1815 int obd_zombie_impexp_init(void)
1816 {
1817 #ifdef __KERNEL__
1818         struct task_struct *task;
1819 #endif
1820
1821         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1822         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1823         spin_lock_init(&obd_zombie_impexp_lock);
1824         init_completion(&obd_zombie_start);
1825         init_completion(&obd_zombie_stop);
1826         init_waitqueue_head(&obd_zombie_waitq);
1827         obd_zombie_pid = 0;
1828
1829 #ifdef __KERNEL__
1830         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1831         if (IS_ERR(task))
1832                 RETURN(PTR_ERR(task));
1833
1834         wait_for_completion(&obd_zombie_start);
1835 #else
1836
1837         obd_zombie_impexp_work_cb =
1838                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1839                                                  &obd_zombie_impexp_kill, NULL);
1840
1841         obd_zombie_impexp_idle_cb =
1842                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1843                                                  &obd_zombie_impexp_check, NULL);
1844 #endif
1845         RETURN(0);
1846 }
1847 /**
1848  * stop destroy zombie import/export thread
1849  */
1850 void obd_zombie_impexp_stop(void)
1851 {
1852         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1853         obd_zombie_impexp_notify();
1854 #ifdef __KERNEL__
1855         wait_for_completion(&obd_zombie_stop);
1856 #else
1857         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1858         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1859 #endif
1860 }
1861
1862 /***** Kernel-userspace comm helpers *******/
1863
1864 /* Get length of entire message, including header */
1865 int kuc_len(int payload_len)
1866 {
1867         return sizeof(struct kuc_hdr) + payload_len;
1868 }
1869 EXPORT_SYMBOL(kuc_len);
1870
1871 /* Get a pointer to kuc header, given a ptr to the payload
1872  * @param p Pointer to payload area
1873  * @returns Pointer to kuc header
1874  */
1875 struct kuc_hdr * kuc_ptr(void *p)
1876 {
1877         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1878         LASSERT(lh->kuc_magic == KUC_MAGIC);
1879         return lh;
1880 }
1881 EXPORT_SYMBOL(kuc_ptr);
1882
1883 /* Test if payload is part of kuc message
1884  * @param p Pointer to payload area
1885  * @returns boolean
1886  */
1887 int kuc_ispayload(void *p)
1888 {
1889         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1890
1891         if (kh->kuc_magic == KUC_MAGIC)
1892                 return 1;
1893         else
1894                 return 0;
1895 }
1896 EXPORT_SYMBOL(kuc_ispayload);
1897
1898 /* Alloc space for a message, and fill in header
1899  * @return Pointer to payload area
1900  */
1901 void *kuc_alloc(int payload_len, int transport, int type)
1902 {
1903         struct kuc_hdr *lh;
1904         int len = kuc_len(payload_len);
1905
1906         OBD_ALLOC(lh, len);
1907         if (lh == NULL)
1908                 return ERR_PTR(-ENOMEM);
1909
1910         lh->kuc_magic = KUC_MAGIC;
1911         lh->kuc_transport = transport;
1912         lh->kuc_msgtype = type;
1913         lh->kuc_msglen = len;
1914
1915         return (void *)(lh + 1);
1916 }
1917 EXPORT_SYMBOL(kuc_alloc);
1918
1919 /* Takes pointer to payload area */
1920 inline void kuc_free(void *p, int payload_len)
1921 {
1922         struct kuc_hdr *lh = kuc_ptr(p);
1923         OBD_FREE(lh, kuc_len(payload_len));
1924 }
1925 EXPORT_SYMBOL(kuc_free);
1926
1927
1928