Whamcloud - gitweb
bf8b6cea6cca33ce8133d6be52ea84f0468d8147
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
48
49 extern cfs_list_t obd_types;
50 spinlock_t obd_types_lock;
51
52 struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 struct kmem_cache *import_cachep;
56
57 cfs_list_t      obd_zombie_imports;
58 cfs_list_t      obd_zombie_exports;
59 spinlock_t  obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         cfs_list_t *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         cfs_list_for_each(tmp, &obd_types) {
105                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114 EXPORT_SYMBOL(class_search_type);
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123
124                 if (strcmp(modname, "obdfilter") == 0)
125                         modname = "ofd";
126
127                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128                         modname = LUSTRE_OSP_NAME;
129
130                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131                         modname = LUSTRE_MDT_NAME;
132
133                 if (!request_module("%s", modname)) {
134                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135                         type = class_search_type(name);
136                 } else {
137                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138                                            modname);
139                 }
140         }
141 #endif
142         if (type) {
143                 spin_lock(&type->obd_type_lock);
144                 type->typ_refcnt++;
145                 try_module_get(type->typ_dt_ops->o_owner);
146                 spin_unlock(&type->obd_type_lock);
147         }
148         return type;
149 }
150 EXPORT_SYMBOL(class_get_type);
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160 EXPORT_SYMBOL(class_put_type);
161
162 #define CLASS_MAX_NAME 1024
163
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165                         struct lprocfs_seq_vars *module_vars,
166 #ifndef HAVE_ONLY_PROCFS_SEQ
167                         struct lprocfs_vars *vars,
168 #endif
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef LPROCFS
205 #ifndef HAVE_ONLY_PROCFS_SEQ
206         if (vars) {
207                 type->typ_procroot = lprocfs_register(type->typ_name,
208                                                         proc_lustre_root,
209                                                         vars, type);
210         } else
211 #endif
212         {
213                 type->typ_procroot = lprocfs_seq_register(type->typ_name,
214                                                         proc_lustre_root,
215                                                         module_vars, type);
216         }
217         if (IS_ERR(type->typ_procroot)) {
218                 rc = PTR_ERR(type->typ_procroot);
219                 type->typ_procroot = NULL;
220                 GOTO (failed, rc);
221         }
222 #endif
223         if (ldt != NULL) {
224                 type->typ_lu = ldt;
225                 rc = lu_device_type_init(ldt);
226                 if (rc != 0)
227                         GOTO (failed, rc);
228         }
229
230         spin_lock(&obd_types_lock);
231         cfs_list_add(&type->typ_chain, &obd_types);
232         spin_unlock(&obd_types_lock);
233
234         RETURN (0);
235
236  failed:
237 #ifdef LPROCFS
238         if (type->typ_name != NULL) {
239 #ifndef HAVE_ONLY_PROCFS_SEQ
240                 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
241 #else
242                 remove_proc_subtree(type->typ_name, proc_lustre_root);
243 #endif
244         }
245 #endif
246         if (type->typ_name != NULL)
247                 OBD_FREE(type->typ_name, strlen(name) + 1);
248         if (type->typ_md_ops != NULL)
249                 OBD_FREE_PTR(type->typ_md_ops);
250         if (type->typ_dt_ops != NULL)
251                 OBD_FREE_PTR(type->typ_dt_ops);
252         OBD_FREE(type, sizeof(*type));
253         RETURN(rc);
254 }
255 EXPORT_SYMBOL(class_register_type);
256
257 int class_unregister_type(const char *name)
258 {
259         struct obd_type *type = class_search_type(name);
260         ENTRY;
261
262         if (!type) {
263                 CERROR("unknown obd type\n");
264                 RETURN(-EINVAL);
265         }
266
267         if (type->typ_refcnt) {
268                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
269                 /* This is a bad situation, let's make the best of it */
270                 /* Remove ops, but leave the name for debugging */
271                 OBD_FREE_PTR(type->typ_dt_ops);
272                 OBD_FREE_PTR(type->typ_md_ops);
273                 RETURN(-EBUSY);
274         }
275
276         /* we do not use type->typ_procroot as for compatibility purposes
277          * other modules can share names (i.e. lod can use lov entry). so
278          * we can't reference pointer as it can get invalided when another
279          * module removes the entry */
280 #ifdef LPROCFS
281 #ifndef HAVE_ONLY_PROCFS_SEQ
282         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
283 #else
284         remove_proc_subtree(type->typ_name, proc_lustre_root);
285 #endif
286 #endif
287         if (type->typ_lu)
288                 lu_device_type_fini(type->typ_lu);
289
290         spin_lock(&obd_types_lock);
291         cfs_list_del(&type->typ_chain);
292         spin_unlock(&obd_types_lock);
293         OBD_FREE(type->typ_name, strlen(name) + 1);
294         if (type->typ_dt_ops != NULL)
295                 OBD_FREE_PTR(type->typ_dt_ops);
296         if (type->typ_md_ops != NULL)
297                 OBD_FREE_PTR(type->typ_md_ops);
298         OBD_FREE(type, sizeof(*type));
299         RETURN(0);
300 } /* class_unregister_type */
301 EXPORT_SYMBOL(class_unregister_type);
302
303 /**
304  * Create a new obd device.
305  *
306  * Find an empty slot in ::obd_devs[], create a new obd device in it.
307  *
308  * \param[in] type_name obd device type string.
309  * \param[in] name      obd device name.
310  *
311  * \retval NULL if create fails, otherwise return the obd device
312  *         pointer created.
313  */
314 struct obd_device *class_newdev(const char *type_name, const char *name)
315 {
316         struct obd_device *result = NULL;
317         struct obd_device *newdev;
318         struct obd_type *type = NULL;
319         int i;
320         int new_obd_minor = 0;
321         ENTRY;
322
323         if (strlen(name) >= MAX_OBD_NAME) {
324                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
325                 RETURN(ERR_PTR(-EINVAL));
326         }
327
328         type = class_get_type(type_name);
329         if (type == NULL){
330                 CERROR("OBD: unknown type: %s\n", type_name);
331                 RETURN(ERR_PTR(-ENODEV));
332         }
333
334         newdev = obd_device_alloc();
335         if (newdev == NULL)
336                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
337
338         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
339
340         write_lock(&obd_dev_lock);
341         for (i = 0; i < class_devno_max(); i++) {
342                 struct obd_device *obd = class_num2obd(i);
343
344                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
345                         CERROR("Device %s already exists at %d, won't add\n",
346                                name, i);
347                         if (result) {
348                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
349                                          "%p obd_magic %08x != %08x\n", result,
350                                          result->obd_magic, OBD_DEVICE_MAGIC);
351                                 LASSERTF(result->obd_minor == new_obd_minor,
352                                          "%p obd_minor %d != %d\n", result,
353                                          result->obd_minor, new_obd_minor);
354
355                                 obd_devs[result->obd_minor] = NULL;
356                                 result->obd_name[0]='\0';
357                          }
358                         result = ERR_PTR(-EEXIST);
359                         break;
360                 }
361                 if (!result && !obd) {
362                         result = newdev;
363                         result->obd_minor = i;
364                         new_obd_minor = i;
365                         result->obd_type = type;
366                         strncpy(result->obd_name, name,
367                                 sizeof(result->obd_name) - 1);
368                         obd_devs[i] = result;
369                 }
370         }
371         write_unlock(&obd_dev_lock);
372
373         if (result == NULL && i >= class_devno_max()) {
374                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
375                        class_devno_max());
376                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
377         }
378
379         if (IS_ERR(result))
380                 GOTO(out, result);
381
382         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
383                result->obd_name, result);
384
385         RETURN(result);
386 out:
387         obd_device_free(newdev);
388 out_type:
389         class_put_type(type);
390         return result;
391 }
392
393 void class_release_dev(struct obd_device *obd)
394 {
395         struct obd_type *obd_type = obd->obd_type;
396
397         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
398                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
399         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
400                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
401         LASSERT(obd_type != NULL);
402
403         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
404                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
405
406         write_lock(&obd_dev_lock);
407         obd_devs[obd->obd_minor] = NULL;
408         write_unlock(&obd_dev_lock);
409         obd_device_free(obd);
410
411         class_put_type(obd_type);
412 }
413
414 int class_name2dev(const char *name)
415 {
416         int i;
417
418         if (!name)
419                 return -1;
420
421         read_lock(&obd_dev_lock);
422         for (i = 0; i < class_devno_max(); i++) {
423                 struct obd_device *obd = class_num2obd(i);
424
425                 if (obd && strcmp(name, obd->obd_name) == 0) {
426                         /* Make sure we finished attaching before we give
427                            out any references */
428                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
429                         if (obd->obd_attached) {
430                                 read_unlock(&obd_dev_lock);
431                                 return i;
432                         }
433                         break;
434                 }
435         }
436         read_unlock(&obd_dev_lock);
437
438         return -1;
439 }
440 EXPORT_SYMBOL(class_name2dev);
441
442 struct obd_device *class_name2obd(const char *name)
443 {
444         int dev = class_name2dev(name);
445
446         if (dev < 0 || dev > class_devno_max())
447                 return NULL;
448         return class_num2obd(dev);
449 }
450 EXPORT_SYMBOL(class_name2obd);
451
452 int class_uuid2dev(struct obd_uuid *uuid)
453 {
454         int i;
455
456         read_lock(&obd_dev_lock);
457         for (i = 0; i < class_devno_max(); i++) {
458                 struct obd_device *obd = class_num2obd(i);
459
460                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
461                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
462                         read_unlock(&obd_dev_lock);
463                         return i;
464                 }
465         }
466         read_unlock(&obd_dev_lock);
467
468         return -1;
469 }
470 EXPORT_SYMBOL(class_uuid2dev);
471
472 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
473 {
474         int dev = class_uuid2dev(uuid);
475         if (dev < 0)
476                 return NULL;
477         return class_num2obd(dev);
478 }
479 EXPORT_SYMBOL(class_uuid2obd);
480
481 /**
482  * Get obd device from ::obd_devs[]
483  *
484  * \param num [in] array index
485  *
486  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
487  *         otherwise return the obd device there.
488  */
489 struct obd_device *class_num2obd(int num)
490 {
491         struct obd_device *obd = NULL;
492
493         if (num < class_devno_max()) {
494                 obd = obd_devs[num];
495                 if (obd == NULL)
496                         return NULL;
497
498                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
499                          "%p obd_magic %08x != %08x\n",
500                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
501                 LASSERTF(obd->obd_minor == num,
502                          "%p obd_minor %0d != %0d\n",
503                          obd, obd->obd_minor, num);
504         }
505
506         return obd;
507 }
508 EXPORT_SYMBOL(class_num2obd);
509
510 /**
511  * Get obd devices count. Device in any
512  *    state are counted
513  * \retval obd device count
514  */
515 int get_devices_count(void)
516 {
517         int index, max_index = class_devno_max(), dev_count = 0;
518
519         read_lock(&obd_dev_lock);
520         for (index = 0; index <= max_index; index++) {
521                 struct obd_device *obd = class_num2obd(index);
522                 if (obd != NULL)
523                         dev_count++;
524         }
525         read_unlock(&obd_dev_lock);
526
527         return dev_count;
528 }
529 EXPORT_SYMBOL(get_devices_count);
530
531 void class_obd_list(void)
532 {
533         char *status;
534         int i;
535
536         read_lock(&obd_dev_lock);
537         for (i = 0; i < class_devno_max(); i++) {
538                 struct obd_device *obd = class_num2obd(i);
539
540                 if (obd == NULL)
541                         continue;
542                 if (obd->obd_stopping)
543                         status = "ST";
544                 else if (obd->obd_set_up)
545                         status = "UP";
546                 else if (obd->obd_attached)
547                         status = "AT";
548                 else
549                         status = "--";
550                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
551                          i, status, obd->obd_type->typ_name,
552                          obd->obd_name, obd->obd_uuid.uuid,
553                          atomic_read(&obd->obd_refcount));
554         }
555         read_unlock(&obd_dev_lock);
556         return;
557 }
558
559 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
560    specified, then only the client with that uuid is returned,
561    otherwise any client connected to the tgt is returned. */
562 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
563                                           const char * typ_name,
564                                           struct obd_uuid *grp_uuid)
565 {
566         int i;
567
568         read_lock(&obd_dev_lock);
569         for (i = 0; i < class_devno_max(); i++) {
570                 struct obd_device *obd = class_num2obd(i);
571
572                 if (obd == NULL)
573                         continue;
574                 if ((strncmp(obd->obd_type->typ_name, typ_name,
575                              strlen(typ_name)) == 0)) {
576                         if (obd_uuid_equals(tgt_uuid,
577                                             &obd->u.cli.cl_target_uuid) &&
578                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
579                                                          &obd->obd_uuid) : 1)) {
580                                 read_unlock(&obd_dev_lock);
581                                 return obd;
582                         }
583                 }
584         }
585         read_unlock(&obd_dev_lock);
586
587         return NULL;
588 }
589 EXPORT_SYMBOL(class_find_client_obd);
590
591 /* Iterate the obd_device list looking devices have grp_uuid. Start
592    searching at *next, and if a device is found, the next index to look
593    at is saved in *next. If next is NULL, then the first matching device
594    will always be returned. */
595 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
596 {
597         int i;
598
599         if (next == NULL)
600                 i = 0;
601         else if (*next >= 0 && *next < class_devno_max())
602                 i = *next;
603         else
604                 return NULL;
605
606         read_lock(&obd_dev_lock);
607         for (; i < class_devno_max(); i++) {
608                 struct obd_device *obd = class_num2obd(i);
609
610                 if (obd == NULL)
611                         continue;
612                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
613                         if (next != NULL)
614                                 *next = i+1;
615                         read_unlock(&obd_dev_lock);
616                         return obd;
617                 }
618         }
619         read_unlock(&obd_dev_lock);
620
621         return NULL;
622 }
623 EXPORT_SYMBOL(class_devices_in_group);
624
625 /**
626  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
627  * adjust sptlrpc settings accordingly.
628  */
629 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
630 {
631         struct obd_device  *obd;
632         const char         *type;
633         int                 i, rc = 0, rc2;
634
635         LASSERT(namelen > 0);
636
637         read_lock(&obd_dev_lock);
638         for (i = 0; i < class_devno_max(); i++) {
639                 obd = class_num2obd(i);
640
641                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
642                         continue;
643
644                 /* only notify mdc, osc, mdt, ost */
645                 type = obd->obd_type->typ_name;
646                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
647                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
648                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
649                     strcmp(type, LUSTRE_OST_NAME) != 0)
650                         continue;
651
652                 if (strncmp(obd->obd_name, fsname, namelen))
653                         continue;
654
655                 class_incref(obd, __FUNCTION__, obd);
656                 read_unlock(&obd_dev_lock);
657                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
658                                          sizeof(KEY_SPTLRPC_CONF),
659                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
660                 rc = rc ? rc : rc2;
661                 class_decref(obd, __FUNCTION__, obd);
662                 read_lock(&obd_dev_lock);
663         }
664         read_unlock(&obd_dev_lock);
665         return rc;
666 }
667 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
668
669 void obd_cleanup_caches(void)
670 {
671         ENTRY;
672         if (obd_device_cachep) {
673                 kmem_cache_destroy(obd_device_cachep);
674                 obd_device_cachep = NULL;
675         }
676         if (obdo_cachep) {
677                 kmem_cache_destroy(obdo_cachep);
678                 obdo_cachep = NULL;
679         }
680         if (import_cachep) {
681                 kmem_cache_destroy(import_cachep);
682                 import_cachep = NULL;
683         }
684         if (capa_cachep) {
685                 kmem_cache_destroy(capa_cachep);
686                 capa_cachep = NULL;
687         }
688         EXIT;
689 }
690
691 int obd_init_caches(void)
692 {
693         int rc;
694         ENTRY;
695
696         LASSERT(obd_device_cachep == NULL);
697         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
698                                               sizeof(struct obd_device),
699                                               0, 0, NULL);
700         if (!obd_device_cachep)
701                 GOTO(out, rc = -ENOMEM);
702
703         LASSERT(obdo_cachep == NULL);
704         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
705                                         0, 0, NULL);
706         if (!obdo_cachep)
707                 GOTO(out, rc = -ENOMEM);
708
709         LASSERT(import_cachep == NULL);
710         import_cachep = kmem_cache_create("ll_import_cache",
711                                           sizeof(struct obd_import),
712                                           0, 0, NULL);
713         if (!import_cachep)
714                 GOTO(out, rc = -ENOMEM);
715
716         LASSERT(capa_cachep == NULL);
717         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
718                                         0, 0, NULL);
719         if (!capa_cachep)
720                 GOTO(out, rc = -ENOMEM);
721
722         RETURN(0);
723 out:
724         obd_cleanup_caches();
725         RETURN(rc);
726 }
727
728 /* map connection to client */
729 struct obd_export *class_conn2export(struct lustre_handle *conn)
730 {
731         struct obd_export *export;
732         ENTRY;
733
734         if (!conn) {
735                 CDEBUG(D_CACHE, "looking for null handle\n");
736                 RETURN(NULL);
737         }
738
739         if (conn->cookie == -1) {  /* this means assign a new connection */
740                 CDEBUG(D_CACHE, "want a new connection\n");
741                 RETURN(NULL);
742         }
743
744         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
745         export = class_handle2object(conn->cookie, NULL);
746         RETURN(export);
747 }
748 EXPORT_SYMBOL(class_conn2export);
749
750 struct obd_device *class_exp2obd(struct obd_export *exp)
751 {
752         if (exp)
753                 return exp->exp_obd;
754         return NULL;
755 }
756 EXPORT_SYMBOL(class_exp2obd);
757
758 struct obd_device *class_conn2obd(struct lustre_handle *conn)
759 {
760         struct obd_export *export;
761         export = class_conn2export(conn);
762         if (export) {
763                 struct obd_device *obd = export->exp_obd;
764                 class_export_put(export);
765                 return obd;
766         }
767         return NULL;
768 }
769 EXPORT_SYMBOL(class_conn2obd);
770
771 struct obd_import *class_exp2cliimp(struct obd_export *exp)
772 {
773         struct obd_device *obd = exp->exp_obd;
774         if (obd == NULL)
775                 return NULL;
776         return obd->u.cli.cl_import;
777 }
778 EXPORT_SYMBOL(class_exp2cliimp);
779
780 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
781 {
782         struct obd_device *obd = class_conn2obd(conn);
783         if (obd == NULL)
784                 return NULL;
785         return obd->u.cli.cl_import;
786 }
787 EXPORT_SYMBOL(class_conn2cliimp);
788
789 /* Export management functions */
790 static void class_export_destroy(struct obd_export *exp)
791 {
792         struct obd_device *obd = exp->exp_obd;
793         ENTRY;
794
795         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
796         LASSERT(obd != NULL);
797
798         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
799                exp->exp_client_uuid.uuid, obd->obd_name);
800
801         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
802         if (exp->exp_connection)
803                 ptlrpc_put_connection_superhack(exp->exp_connection);
804
805         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
806         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
807         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
808         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
809         obd_destroy_export(exp);
810         class_decref(obd, "export", exp);
811
812         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
813         EXIT;
814 }
815
816 static void export_handle_addref(void *export)
817 {
818         class_export_get(export);
819 }
820
821 static struct portals_handle_ops export_handle_ops = {
822         .hop_addref = export_handle_addref,
823         .hop_free   = NULL,
824 };
825
826 struct obd_export *class_export_get(struct obd_export *exp)
827 {
828         atomic_inc(&exp->exp_refcount);
829         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
830                atomic_read(&exp->exp_refcount));
831         return exp;
832 }
833 EXPORT_SYMBOL(class_export_get);
834
835 void class_export_put(struct obd_export *exp)
836 {
837         LASSERT(exp != NULL);
838         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
839         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
840                atomic_read(&exp->exp_refcount) - 1);
841
842         if (atomic_dec_and_test(&exp->exp_refcount)) {
843                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
844                 CDEBUG(D_IOCTL, "final put %p/%s\n",
845                        exp, exp->exp_client_uuid.uuid);
846
847                 /* release nid stat refererence */
848                 lprocfs_exp_cleanup(exp);
849
850                 obd_zombie_export_add(exp);
851         }
852 }
853 EXPORT_SYMBOL(class_export_put);
854
855 /* Creates a new export, adds it to the hash table, and returns a
856  * pointer to it. The refcount is 2: one for the hash reference, and
857  * one for the pointer returned by this function. */
858 struct obd_export *class_new_export(struct obd_device *obd,
859                                     struct obd_uuid *cluuid)
860 {
861         struct obd_export *export;
862         cfs_hash_t *hash = NULL;
863         int rc = 0;
864         ENTRY;
865
866         OBD_ALLOC_PTR(export);
867         if (!export)
868                 return ERR_PTR(-ENOMEM);
869
870         export->exp_conn_cnt = 0;
871         export->exp_lock_hash = NULL;
872         export->exp_flock_hash = NULL;
873         atomic_set(&export->exp_refcount, 2);
874         atomic_set(&export->exp_rpc_count, 0);
875         atomic_set(&export->exp_cb_count, 0);
876         atomic_set(&export->exp_locks_count, 0);
877 #if LUSTRE_TRACKS_LOCK_EXP_REFS
878         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
879         spin_lock_init(&export->exp_locks_list_guard);
880 #endif
881         atomic_set(&export->exp_replay_count, 0);
882         export->exp_obd = obd;
883         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
884         spin_lock_init(&export->exp_uncommitted_replies_lock);
885         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
886         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
887         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
888         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
889         CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
890         class_handle_hash(&export->exp_handle, &export_handle_ops);
891         export->exp_last_request_time = cfs_time_current_sec();
892         spin_lock_init(&export->exp_lock);
893         spin_lock_init(&export->exp_rpc_lock);
894         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
895         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
896         spin_lock_init(&export->exp_bl_list_lock);
897         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
898
899         export->exp_sp_peer = LUSTRE_SP_ANY;
900         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
901         export->exp_client_uuid = *cluuid;
902         obd_init_export(export);
903
904         spin_lock(&obd->obd_dev_lock);
905         /* shouldn't happen, but might race */
906         if (obd->obd_stopping)
907                 GOTO(exit_unlock, rc = -ENODEV);
908
909         hash = cfs_hash_getref(obd->obd_uuid_hash);
910         if (hash == NULL)
911                 GOTO(exit_unlock, rc = -ENODEV);
912         spin_unlock(&obd->obd_dev_lock);
913
914         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
915                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
916                 if (rc != 0) {
917                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
918                                       obd->obd_name, cluuid->uuid, rc);
919                         GOTO(exit_err, rc = -EALREADY);
920                 }
921         }
922
923         spin_lock(&obd->obd_dev_lock);
924         if (obd->obd_stopping) {
925                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
926                 GOTO(exit_unlock, rc = -ENODEV);
927         }
928
929         class_incref(obd, "export", export);
930         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
931         cfs_list_add_tail(&export->exp_obd_chain_timed,
932                           &export->exp_obd->obd_exports_timed);
933         export->exp_obd->obd_num_exports++;
934         spin_unlock(&obd->obd_dev_lock);
935         cfs_hash_putref(hash);
936         RETURN(export);
937
938 exit_unlock:
939         spin_unlock(&obd->obd_dev_lock);
940 exit_err:
941         if (hash)
942                 cfs_hash_putref(hash);
943         class_handle_unhash(&export->exp_handle);
944         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
945         obd_destroy_export(export);
946         OBD_FREE_PTR(export);
947         return ERR_PTR(rc);
948 }
949 EXPORT_SYMBOL(class_new_export);
950
951 void class_unlink_export(struct obd_export *exp)
952 {
953         class_handle_unhash(&exp->exp_handle);
954
955         spin_lock(&exp->exp_obd->obd_dev_lock);
956         /* delete an uuid-export hashitem from hashtables */
957         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
958                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
959                              &exp->exp_client_uuid,
960                              &exp->exp_uuid_hash);
961
962         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
963         cfs_list_del_init(&exp->exp_obd_chain_timed);
964         exp->exp_obd->obd_num_exports--;
965         spin_unlock(&exp->exp_obd->obd_dev_lock);
966         class_export_put(exp);
967 }
968 EXPORT_SYMBOL(class_unlink_export);
969
970 /* Import management functions */
971 void class_import_destroy(struct obd_import *imp)
972 {
973         ENTRY;
974
975         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
976                 imp->imp_obd->obd_name);
977
978         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
979
980         ptlrpc_put_connection_superhack(imp->imp_connection);
981
982         while (!cfs_list_empty(&imp->imp_conn_list)) {
983                 struct obd_import_conn *imp_conn;
984
985                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
986                                           struct obd_import_conn, oic_item);
987                 cfs_list_del_init(&imp_conn->oic_item);
988                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
989                 OBD_FREE(imp_conn, sizeof(*imp_conn));
990         }
991
992         LASSERT(imp->imp_sec == NULL);
993         class_decref(imp->imp_obd, "import", imp);
994         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
995         EXIT;
996 }
997
998 static void import_handle_addref(void *import)
999 {
1000         class_import_get(import);
1001 }
1002
1003 static struct portals_handle_ops import_handle_ops = {
1004         .hop_addref = import_handle_addref,
1005         .hop_free   = NULL,
1006 };
1007
1008 struct obd_import *class_import_get(struct obd_import *import)
1009 {
1010         atomic_inc(&import->imp_refcount);
1011         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1012                atomic_read(&import->imp_refcount),
1013                import->imp_obd->obd_name);
1014         return import;
1015 }
1016 EXPORT_SYMBOL(class_import_get);
1017
1018 void class_import_put(struct obd_import *imp)
1019 {
1020         ENTRY;
1021
1022         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1023         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1024
1025         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1026                atomic_read(&imp->imp_refcount) - 1,
1027                imp->imp_obd->obd_name);
1028
1029         if (atomic_dec_and_test(&imp->imp_refcount)) {
1030                 CDEBUG(D_INFO, "final put import %p\n", imp);
1031                 obd_zombie_import_add(imp);
1032         }
1033
1034         /* catch possible import put race */
1035         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1036         EXIT;
1037 }
1038 EXPORT_SYMBOL(class_import_put);
1039
1040 static void init_imp_at(struct imp_at *at) {
1041         int i;
1042         at_init(&at->iat_net_latency, 0, 0);
1043         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1044                 /* max service estimates are tracked on the server side, so
1045                    don't use the AT history here, just use the last reported
1046                    val. (But keep hist for proc histogram, worst_ever) */
1047                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1048                         AT_FLG_NOHIST);
1049         }
1050 }
1051
1052 struct obd_import *class_new_import(struct obd_device *obd)
1053 {
1054         struct obd_import *imp;
1055
1056         OBD_ALLOC(imp, sizeof(*imp));
1057         if (imp == NULL)
1058                 return NULL;
1059
1060         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1061         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1062         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1063         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1064         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1065         CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1066         imp->imp_replay_cursor = &imp->imp_committed_list;
1067         spin_lock_init(&imp->imp_lock);
1068         imp->imp_last_success_conn = 0;
1069         imp->imp_state = LUSTRE_IMP_NEW;
1070         imp->imp_obd = class_incref(obd, "import", imp);
1071         mutex_init(&imp->imp_sec_mutex);
1072         init_waitqueue_head(&imp->imp_recovery_waitq);
1073
1074         atomic_set(&imp->imp_refcount, 2);
1075         atomic_set(&imp->imp_unregistering, 0);
1076         atomic_set(&imp->imp_inflight, 0);
1077         atomic_set(&imp->imp_replay_inflight, 0);
1078         atomic_set(&imp->imp_inval_count, 0);
1079         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1080         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1081         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1082         init_imp_at(&imp->imp_at);
1083
1084         /* the default magic is V2, will be used in connect RPC, and
1085          * then adjusted according to the flags in request/reply. */
1086         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1087
1088         return imp;
1089 }
1090 EXPORT_SYMBOL(class_new_import);
1091
1092 void class_destroy_import(struct obd_import *import)
1093 {
1094         LASSERT(import != NULL);
1095         LASSERT(import != LP_POISON);
1096
1097         class_handle_unhash(&import->imp_handle);
1098
1099         spin_lock(&import->imp_lock);
1100         import->imp_generation++;
1101         spin_unlock(&import->imp_lock);
1102         class_import_put(import);
1103 }
1104 EXPORT_SYMBOL(class_destroy_import);
1105
1106 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1107
1108 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1109 {
1110         spin_lock(&exp->exp_locks_list_guard);
1111
1112         LASSERT(lock->l_exp_refs_nr >= 0);
1113
1114         if (lock->l_exp_refs_target != NULL &&
1115             lock->l_exp_refs_target != exp) {
1116                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1117                               exp, lock, lock->l_exp_refs_target);
1118         }
1119         if ((lock->l_exp_refs_nr ++) == 0) {
1120                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1121                 lock->l_exp_refs_target = exp;
1122         }
1123         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1124                lock, exp, lock->l_exp_refs_nr);
1125         spin_unlock(&exp->exp_locks_list_guard);
1126 }
1127 EXPORT_SYMBOL(__class_export_add_lock_ref);
1128
1129 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1130 {
1131         spin_lock(&exp->exp_locks_list_guard);
1132         LASSERT(lock->l_exp_refs_nr > 0);
1133         if (lock->l_exp_refs_target != exp) {
1134                 LCONSOLE_WARN("lock %p, "
1135                               "mismatching export pointers: %p, %p\n",
1136                               lock, lock->l_exp_refs_target, exp);
1137         }
1138         if (-- lock->l_exp_refs_nr == 0) {
1139                 cfs_list_del_init(&lock->l_exp_refs_link);
1140                 lock->l_exp_refs_target = NULL;
1141         }
1142         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1143                lock, exp, lock->l_exp_refs_nr);
1144         spin_unlock(&exp->exp_locks_list_guard);
1145 }
1146 EXPORT_SYMBOL(__class_export_del_lock_ref);
1147 #endif
1148
1149 /* A connection defines an export context in which preallocation can
1150    be managed. This releases the export pointer reference, and returns
1151    the export handle, so the export refcount is 1 when this function
1152    returns. */
1153 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1154                   struct obd_uuid *cluuid)
1155 {
1156         struct obd_export *export;
1157         LASSERT(conn != NULL);
1158         LASSERT(obd != NULL);
1159         LASSERT(cluuid != NULL);
1160         ENTRY;
1161
1162         export = class_new_export(obd, cluuid);
1163         if (IS_ERR(export))
1164                 RETURN(PTR_ERR(export));
1165
1166         conn->cookie = export->exp_handle.h_cookie;
1167         class_export_put(export);
1168
1169         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1170                cluuid->uuid, conn->cookie);
1171         RETURN(0);
1172 }
1173 EXPORT_SYMBOL(class_connect);
1174
1175 /* if export is involved in recovery then clean up related things */
1176 void class_export_recovery_cleanup(struct obd_export *exp)
1177 {
1178         struct obd_device *obd = exp->exp_obd;
1179
1180         spin_lock(&obd->obd_recovery_task_lock);
1181         if (exp->exp_delayed)
1182                 obd->obd_delayed_clients--;
1183         if (obd->obd_recovering) {
1184                 if (exp->exp_in_recovery) {
1185                         spin_lock(&exp->exp_lock);
1186                         exp->exp_in_recovery = 0;
1187                         spin_unlock(&exp->exp_lock);
1188                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1189                         atomic_dec(&obd->obd_connected_clients);
1190                 }
1191
1192                 /* if called during recovery then should update
1193                  * obd_stale_clients counter,
1194                  * lightweight exports are not counted */
1195                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1196                         exp->exp_obd->obd_stale_clients++;
1197         }
1198         spin_unlock(&obd->obd_recovery_task_lock);
1199         /** Cleanup req replay fields */
1200         if (exp->exp_req_replay_needed) {
1201                 spin_lock(&exp->exp_lock);
1202                 exp->exp_req_replay_needed = 0;
1203                 spin_unlock(&exp->exp_lock);
1204                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1205                 atomic_dec(&obd->obd_req_replay_clients);
1206         }
1207         /** Cleanup lock replay data */
1208         if (exp->exp_lock_replay_needed) {
1209                 spin_lock(&exp->exp_lock);
1210                 exp->exp_lock_replay_needed = 0;
1211                 spin_unlock(&exp->exp_lock);
1212                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1213                 atomic_dec(&obd->obd_lock_replay_clients);
1214         }
1215 }
1216
1217 /* This function removes 1-3 references from the export:
1218  * 1 - for export pointer passed
1219  * and if disconnect really need
1220  * 2 - removing from hash
1221  * 3 - in client_unlink_export
1222  * The export pointer passed to this function can destroyed */
1223 int class_disconnect(struct obd_export *export)
1224 {
1225         int already_disconnected;
1226         ENTRY;
1227
1228         if (export == NULL) {
1229                 CWARN("attempting to free NULL export %p\n", export);
1230                 RETURN(-EINVAL);
1231         }
1232
1233         spin_lock(&export->exp_lock);
1234         already_disconnected = export->exp_disconnected;
1235         export->exp_disconnected = 1;
1236         spin_unlock(&export->exp_lock);
1237
1238         /* class_cleanup(), abort_recovery(), and class_fail_export()
1239          * all end up in here, and if any of them race we shouldn't
1240          * call extra class_export_puts(). */
1241         if (already_disconnected) {
1242                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1243                 GOTO(no_disconn, already_disconnected);
1244         }
1245
1246         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1247                export->exp_handle.h_cookie);
1248
1249         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1250                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1251                              &export->exp_connection->c_peer.nid,
1252                              &export->exp_nid_hash);
1253
1254         class_export_recovery_cleanup(export);
1255         class_unlink_export(export);
1256 no_disconn:
1257         class_export_put(export);
1258         RETURN(0);
1259 }
1260 EXPORT_SYMBOL(class_disconnect);
1261
1262 /* Return non-zero for a fully connected export */
1263 int class_connected_export(struct obd_export *exp)
1264 {
1265         int connected = 0;
1266
1267         if (exp) {
1268                 spin_lock(&exp->exp_lock);
1269                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1270                 spin_unlock(&exp->exp_lock);
1271         }
1272         return connected;
1273 }
1274 EXPORT_SYMBOL(class_connected_export);
1275
1276 static void class_disconnect_export_list(cfs_list_t *list,
1277                                          enum obd_option flags)
1278 {
1279         int rc;
1280         struct obd_export *exp;
1281         ENTRY;
1282
1283         /* It's possible that an export may disconnect itself, but
1284          * nothing else will be added to this list. */
1285         while (!cfs_list_empty(list)) {
1286                 exp = cfs_list_entry(list->next, struct obd_export,
1287                                      exp_obd_chain);
1288                 /* need for safe call CDEBUG after obd_disconnect */
1289                 class_export_get(exp);
1290
1291                 spin_lock(&exp->exp_lock);
1292                 exp->exp_flags = flags;
1293                 spin_unlock(&exp->exp_lock);
1294
1295                 if (obd_uuid_equals(&exp->exp_client_uuid,
1296                                     &exp->exp_obd->obd_uuid)) {
1297                         CDEBUG(D_HA,
1298                                "exp %p export uuid == obd uuid, don't discon\n",
1299                                exp);
1300                         /* Need to delete this now so we don't end up pointing
1301                          * to work_list later when this export is cleaned up. */
1302                         cfs_list_del_init(&exp->exp_obd_chain);
1303                         class_export_put(exp);
1304                         continue;
1305                 }
1306
1307                 class_export_get(exp);
1308                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1309                        "last request at "CFS_TIME_T"\n",
1310                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1311                        exp, exp->exp_last_request_time);
1312                 /* release one export reference anyway */
1313                 rc = obd_disconnect(exp);
1314
1315                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1316                        obd_export_nid2str(exp), exp, rc);
1317                 class_export_put(exp);
1318         }
1319         EXIT;
1320 }
1321
1322 void class_disconnect_exports(struct obd_device *obd)
1323 {
1324         cfs_list_t work_list;
1325         ENTRY;
1326
1327         /* Move all of the exports from obd_exports to a work list, en masse. */
1328         CFS_INIT_LIST_HEAD(&work_list);
1329         spin_lock(&obd->obd_dev_lock);
1330         cfs_list_splice_init(&obd->obd_exports, &work_list);
1331         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1332         spin_unlock(&obd->obd_dev_lock);
1333
1334         if (!cfs_list_empty(&work_list)) {
1335                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1336                        "disconnecting them\n", obd->obd_minor, obd);
1337                 class_disconnect_export_list(&work_list,
1338                                              exp_flags_from_obd(obd));
1339         } else
1340                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1341                        obd->obd_minor, obd);
1342         EXIT;
1343 }
1344 EXPORT_SYMBOL(class_disconnect_exports);
1345
1346 /* Remove exports that have not completed recovery.
1347  */
1348 void class_disconnect_stale_exports(struct obd_device *obd,
1349                                     int (*test_export)(struct obd_export *))
1350 {
1351         cfs_list_t work_list;
1352         struct obd_export *exp, *n;
1353         int evicted = 0;
1354         ENTRY;
1355
1356         CFS_INIT_LIST_HEAD(&work_list);
1357         spin_lock(&obd->obd_dev_lock);
1358         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1359                                      exp_obd_chain) {
1360                 /* don't count self-export as client */
1361                 if (obd_uuid_equals(&exp->exp_client_uuid,
1362                                     &exp->exp_obd->obd_uuid))
1363                         continue;
1364
1365                 /* don't evict clients which have no slot in last_rcvd
1366                  * (e.g. lightweight connection) */
1367                 if (exp->exp_target_data.ted_lr_idx == -1)
1368                         continue;
1369
1370                 spin_lock(&exp->exp_lock);
1371                 if (exp->exp_failed || test_export(exp)) {
1372                         spin_unlock(&exp->exp_lock);
1373                         continue;
1374                 }
1375                 exp->exp_failed = 1;
1376                 spin_unlock(&exp->exp_lock);
1377
1378                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1379                 evicted++;
1380                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1381                        obd->obd_name, exp->exp_client_uuid.uuid,
1382                        exp->exp_connection == NULL ? "<unknown>" :
1383                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1384                 print_export_data(exp, "EVICTING", 0);
1385         }
1386         spin_unlock(&obd->obd_dev_lock);
1387
1388         if (evicted)
1389                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1390                               obd->obd_name, evicted);
1391
1392         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1393                                                  OBD_OPT_ABORT_RECOV);
1394         EXIT;
1395 }
1396 EXPORT_SYMBOL(class_disconnect_stale_exports);
1397
1398 void class_fail_export(struct obd_export *exp)
1399 {
1400         int rc, already_failed;
1401
1402         spin_lock(&exp->exp_lock);
1403         already_failed = exp->exp_failed;
1404         exp->exp_failed = 1;
1405         spin_unlock(&exp->exp_lock);
1406
1407         if (already_failed) {
1408                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1409                        exp, exp->exp_client_uuid.uuid);
1410                 return;
1411         }
1412
1413         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1414                exp, exp->exp_client_uuid.uuid);
1415
1416         if (obd_dump_on_timeout)
1417                 libcfs_debug_dumplog();
1418
1419         /* need for safe call CDEBUG after obd_disconnect */
1420         class_export_get(exp);
1421
1422         /* Most callers into obd_disconnect are removing their own reference
1423          * (request, for example) in addition to the one from the hash table.
1424          * We don't have such a reference here, so make one. */
1425         class_export_get(exp);
1426         rc = obd_disconnect(exp);
1427         if (rc)
1428                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1429         else
1430                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1431                        exp, exp->exp_client_uuid.uuid);
1432         class_export_put(exp);
1433 }
1434 EXPORT_SYMBOL(class_fail_export);
1435
1436 char *obd_export_nid2str(struct obd_export *exp)
1437 {
1438         if (exp->exp_connection != NULL)
1439                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1440
1441         return "(no nid)";
1442 }
1443 EXPORT_SYMBOL(obd_export_nid2str);
1444
1445 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1446 {
1447         cfs_hash_t *nid_hash;
1448         struct obd_export *doomed_exp = NULL;
1449         int exports_evicted = 0;
1450
1451         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1452
1453         spin_lock(&obd->obd_dev_lock);
1454         /* umount has run already, so evict thread should leave
1455          * its task to umount thread now */
1456         if (obd->obd_stopping) {
1457                 spin_unlock(&obd->obd_dev_lock);
1458                 return exports_evicted;
1459         }
1460         nid_hash = obd->obd_nid_hash;
1461         cfs_hash_getref(nid_hash);
1462         spin_unlock(&obd->obd_dev_lock);
1463
1464         do {
1465                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1466                 if (doomed_exp == NULL)
1467                         break;
1468
1469                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1470                          "nid %s found, wanted nid %s, requested nid %s\n",
1471                          obd_export_nid2str(doomed_exp),
1472                          libcfs_nid2str(nid_key), nid);
1473                 LASSERTF(doomed_exp != obd->obd_self_export,
1474                          "self-export is hashed by NID?\n");
1475                 exports_evicted++;
1476                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1477                               "request\n", obd->obd_name,
1478                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1479                               obd_export_nid2str(doomed_exp));
1480                 class_fail_export(doomed_exp);
1481                 class_export_put(doomed_exp);
1482         } while (1);
1483
1484         cfs_hash_putref(nid_hash);
1485
1486         if (!exports_evicted)
1487                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1488                        obd->obd_name, nid);
1489         return exports_evicted;
1490 }
1491 EXPORT_SYMBOL(obd_export_evict_by_nid);
1492
1493 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1494 {
1495         cfs_hash_t *uuid_hash;
1496         struct obd_export *doomed_exp = NULL;
1497         struct obd_uuid doomed_uuid;
1498         int exports_evicted = 0;
1499
1500         spin_lock(&obd->obd_dev_lock);
1501         if (obd->obd_stopping) {
1502                 spin_unlock(&obd->obd_dev_lock);
1503                 return exports_evicted;
1504         }
1505         uuid_hash = obd->obd_uuid_hash;
1506         cfs_hash_getref(uuid_hash);
1507         spin_unlock(&obd->obd_dev_lock);
1508
1509         obd_str2uuid(&doomed_uuid, uuid);
1510         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1511                 CERROR("%s: can't evict myself\n", obd->obd_name);
1512                 cfs_hash_putref(uuid_hash);
1513                 return exports_evicted;
1514         }
1515
1516         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1517
1518         if (doomed_exp == NULL) {
1519                 CERROR("%s: can't disconnect %s: no exports found\n",
1520                        obd->obd_name, uuid);
1521         } else {
1522                 CWARN("%s: evicting %s at adminstrative request\n",
1523                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1524                 class_fail_export(doomed_exp);
1525                 class_export_put(doomed_exp);
1526                 exports_evicted++;
1527         }
1528         cfs_hash_putref(uuid_hash);
1529
1530         return exports_evicted;
1531 }
1532 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1533
1534 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1535 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1536 EXPORT_SYMBOL(class_export_dump_hook);
1537 #endif
1538
1539 static void print_export_data(struct obd_export *exp, const char *status,
1540                               int locks)
1541 {
1542         struct ptlrpc_reply_state *rs;
1543         struct ptlrpc_reply_state *first_reply = NULL;
1544         int nreplies = 0;
1545
1546         spin_lock(&exp->exp_lock);
1547         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1548                                 rs_exp_list) {
1549                 if (nreplies == 0)
1550                         first_reply = rs;
1551                 nreplies++;
1552         }
1553         spin_unlock(&exp->exp_lock);
1554
1555         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1556                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1557                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1558                atomic_read(&exp->exp_rpc_count),
1559                atomic_read(&exp->exp_cb_count),
1560                atomic_read(&exp->exp_locks_count),
1561                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1562                nreplies, first_reply, nreplies > 3 ? "..." : "",
1563                exp->exp_last_committed);
1564 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1565         if (locks && class_export_dump_hook != NULL)
1566                 class_export_dump_hook(exp);
1567 #endif
1568 }
1569
1570 void dump_exports(struct obd_device *obd, int locks)
1571 {
1572         struct obd_export *exp;
1573
1574         spin_lock(&obd->obd_dev_lock);
1575         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1576                 print_export_data(exp, "ACTIVE", locks);
1577         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1578                 print_export_data(exp, "UNLINKED", locks);
1579         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1580                 print_export_data(exp, "DELAYED", locks);
1581         spin_unlock(&obd->obd_dev_lock);
1582         spin_lock(&obd_zombie_impexp_lock);
1583         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1584                 print_export_data(exp, "ZOMBIE", locks);
1585         spin_unlock(&obd_zombie_impexp_lock);
1586 }
1587 EXPORT_SYMBOL(dump_exports);
1588
1589 void obd_exports_barrier(struct obd_device *obd)
1590 {
1591         int waited = 2;
1592         LASSERT(cfs_list_empty(&obd->obd_exports));
1593         spin_lock(&obd->obd_dev_lock);
1594         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1595                 spin_unlock(&obd->obd_dev_lock);
1596                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1597                                                    cfs_time_seconds(waited));
1598                 if (waited > 5 && IS_PO2(waited)) {
1599                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1600                                       "more than %d seconds. "
1601                                       "The obd refcount = %d. Is it stuck?\n",
1602                                       obd->obd_name, waited,
1603                                       atomic_read(&obd->obd_refcount));
1604                         dump_exports(obd, 1);
1605                 }
1606                 waited *= 2;
1607                 spin_lock(&obd->obd_dev_lock);
1608         }
1609         spin_unlock(&obd->obd_dev_lock);
1610 }
1611 EXPORT_SYMBOL(obd_exports_barrier);
1612
1613 /* Total amount of zombies to be destroyed */
1614 static int zombies_count = 0;
1615
1616 /**
1617  * kill zombie imports and exports
1618  */
1619 void obd_zombie_impexp_cull(void)
1620 {
1621         struct obd_import *import;
1622         struct obd_export *export;
1623         ENTRY;
1624
1625         do {
1626                 spin_lock(&obd_zombie_impexp_lock);
1627
1628                 import = NULL;
1629                 if (!cfs_list_empty(&obd_zombie_imports)) {
1630                         import = cfs_list_entry(obd_zombie_imports.next,
1631                                                 struct obd_import,
1632                                                 imp_zombie_chain);
1633                         cfs_list_del_init(&import->imp_zombie_chain);
1634                 }
1635
1636                 export = NULL;
1637                 if (!cfs_list_empty(&obd_zombie_exports)) {
1638                         export = cfs_list_entry(obd_zombie_exports.next,
1639                                                 struct obd_export,
1640                                                 exp_obd_chain);
1641                         cfs_list_del_init(&export->exp_obd_chain);
1642                 }
1643
1644                 spin_unlock(&obd_zombie_impexp_lock);
1645
1646                 if (import != NULL) {
1647                         class_import_destroy(import);
1648                         spin_lock(&obd_zombie_impexp_lock);
1649                         zombies_count--;
1650                         spin_unlock(&obd_zombie_impexp_lock);
1651                 }
1652
1653                 if (export != NULL) {
1654                         class_export_destroy(export);
1655                         spin_lock(&obd_zombie_impexp_lock);
1656                         zombies_count--;
1657                         spin_unlock(&obd_zombie_impexp_lock);
1658                 }
1659
1660                 cond_resched();
1661         } while (import != NULL || export != NULL);
1662         EXIT;
1663 }
1664
1665 static struct completion        obd_zombie_start;
1666 static struct completion        obd_zombie_stop;
1667 static unsigned long            obd_zombie_flags;
1668 static wait_queue_head_t        obd_zombie_waitq;
1669 static pid_t                    obd_zombie_pid;
1670
1671 enum {
1672         OBD_ZOMBIE_STOP         = 0x0001,
1673 };
1674
1675 /**
1676  * check for work for kill zombie import/export thread.
1677  */
1678 static int obd_zombie_impexp_check(void *arg)
1679 {
1680         int rc;
1681
1682         spin_lock(&obd_zombie_impexp_lock);
1683         rc = (zombies_count == 0) &&
1684              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1685         spin_unlock(&obd_zombie_impexp_lock);
1686
1687         RETURN(rc);
1688 }
1689
1690 /**
1691  * Add export to the obd_zombe thread and notify it.
1692  */
1693 static void obd_zombie_export_add(struct obd_export *exp) {
1694         spin_lock(&exp->exp_obd->obd_dev_lock);
1695         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1696         cfs_list_del_init(&exp->exp_obd_chain);
1697         spin_unlock(&exp->exp_obd->obd_dev_lock);
1698         spin_lock(&obd_zombie_impexp_lock);
1699         zombies_count++;
1700         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1701         spin_unlock(&obd_zombie_impexp_lock);
1702
1703         obd_zombie_impexp_notify();
1704 }
1705
1706 /**
1707  * Add import to the obd_zombe thread and notify it.
1708  */
1709 static void obd_zombie_import_add(struct obd_import *imp) {
1710         LASSERT(imp->imp_sec == NULL);
1711         LASSERT(imp->imp_rq_pool == NULL);
1712         spin_lock(&obd_zombie_impexp_lock);
1713         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1714         zombies_count++;
1715         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1716         spin_unlock(&obd_zombie_impexp_lock);
1717
1718         obd_zombie_impexp_notify();
1719 }
1720
1721 /**
1722  * notify import/export destroy thread about new zombie.
1723  */
1724 static void obd_zombie_impexp_notify(void)
1725 {
1726         /*
1727          * Make sure obd_zomebie_impexp_thread get this notification.
1728          * It is possible this signal only get by obd_zombie_barrier, and
1729          * barrier gulps this notification and sleeps away and hangs ensues
1730          */
1731         wake_up_all(&obd_zombie_waitq);
1732 }
1733
1734 /**
1735  * check whether obd_zombie is idle
1736  */
1737 static int obd_zombie_is_idle(void)
1738 {
1739         int rc;
1740
1741         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1742         spin_lock(&obd_zombie_impexp_lock);
1743         rc = (zombies_count == 0);
1744         spin_unlock(&obd_zombie_impexp_lock);
1745         return rc;
1746 }
1747
1748 /**
1749  * wait when obd_zombie import/export queues become empty
1750  */
1751 void obd_zombie_barrier(void)
1752 {
1753         struct l_wait_info lwi = { 0 };
1754
1755         if (obd_zombie_pid == current_pid())
1756                 /* don't wait for myself */
1757                 return;
1758         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1759 }
1760 EXPORT_SYMBOL(obd_zombie_barrier);
1761
1762 #ifdef __KERNEL__
1763
1764 /**
1765  * destroy zombie export/import thread.
1766  */
1767 static int obd_zombie_impexp_thread(void *unused)
1768 {
1769         unshare_fs_struct();
1770         complete(&obd_zombie_start);
1771
1772         obd_zombie_pid = current_pid();
1773
1774         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1775                 struct l_wait_info lwi = { 0 };
1776
1777                 l_wait_event(obd_zombie_waitq,
1778                              !obd_zombie_impexp_check(NULL), &lwi);
1779                 obd_zombie_impexp_cull();
1780
1781                 /*
1782                  * Notify obd_zombie_barrier callers that queues
1783                  * may be empty.
1784                  */
1785                 wake_up(&obd_zombie_waitq);
1786         }
1787
1788         complete(&obd_zombie_stop);
1789
1790         RETURN(0);
1791 }
1792
1793 #else /* ! KERNEL */
1794
1795 static atomic_t zombie_recur = ATOMIC_INIT(0);
1796 static void *obd_zombie_impexp_work_cb;
1797 static void *obd_zombie_impexp_idle_cb;
1798
1799 int obd_zombie_impexp_kill(void *arg)
1800 {
1801         int rc = 0;
1802
1803         if (atomic_inc_return(&zombie_recur) == 1) {
1804                 obd_zombie_impexp_cull();
1805                 rc = 1;
1806         }
1807         atomic_dec(&zombie_recur);
1808         return rc;
1809 }
1810
1811 #endif
1812
1813 /**
1814  * start destroy zombie import/export thread
1815  */
1816 int obd_zombie_impexp_init(void)
1817 {
1818 #ifdef __KERNEL__
1819         struct task_struct *task;
1820 #endif
1821
1822         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1823         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1824         spin_lock_init(&obd_zombie_impexp_lock);
1825         init_completion(&obd_zombie_start);
1826         init_completion(&obd_zombie_stop);
1827         init_waitqueue_head(&obd_zombie_waitq);
1828         obd_zombie_pid = 0;
1829
1830 #ifdef __KERNEL__
1831         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1832         if (IS_ERR(task))
1833                 RETURN(PTR_ERR(task));
1834
1835         wait_for_completion(&obd_zombie_start);
1836 #else
1837
1838         obd_zombie_impexp_work_cb =
1839                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1840                                                  &obd_zombie_impexp_kill, NULL);
1841
1842         obd_zombie_impexp_idle_cb =
1843                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1844                                                  &obd_zombie_impexp_check, NULL);
1845 #endif
1846         RETURN(0);
1847 }
1848 /**
1849  * stop destroy zombie import/export thread
1850  */
1851 void obd_zombie_impexp_stop(void)
1852 {
1853         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1854         obd_zombie_impexp_notify();
1855 #ifdef __KERNEL__
1856         wait_for_completion(&obd_zombie_stop);
1857 #else
1858         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1859         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1860 #endif
1861 }
1862
1863 /***** Kernel-userspace comm helpers *******/
1864
1865 /* Get length of entire message, including header */
1866 int kuc_len(int payload_len)
1867 {
1868         return sizeof(struct kuc_hdr) + payload_len;
1869 }
1870 EXPORT_SYMBOL(kuc_len);
1871
1872 /* Get a pointer to kuc header, given a ptr to the payload
1873  * @param p Pointer to payload area
1874  * @returns Pointer to kuc header
1875  */
1876 struct kuc_hdr * kuc_ptr(void *p)
1877 {
1878         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1879         LASSERT(lh->kuc_magic == KUC_MAGIC);
1880         return lh;
1881 }
1882 EXPORT_SYMBOL(kuc_ptr);
1883
1884 /* Test if payload is part of kuc message
1885  * @param p Pointer to payload area
1886  * @returns boolean
1887  */
1888 int kuc_ispayload(void *p)
1889 {
1890         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1891
1892         if (kh->kuc_magic == KUC_MAGIC)
1893                 return 1;
1894         else
1895                 return 0;
1896 }
1897 EXPORT_SYMBOL(kuc_ispayload);
1898
1899 /* Alloc space for a message, and fill in header
1900  * @return Pointer to payload area
1901  */
1902 void *kuc_alloc(int payload_len, int transport, int type)
1903 {
1904         struct kuc_hdr *lh;
1905         int len = kuc_len(payload_len);
1906
1907         OBD_ALLOC(lh, len);
1908         if (lh == NULL)
1909                 return ERR_PTR(-ENOMEM);
1910
1911         lh->kuc_magic = KUC_MAGIC;
1912         lh->kuc_transport = transport;
1913         lh->kuc_msgtype = type;
1914         lh->kuc_msglen = len;
1915
1916         return (void *)(lh + 1);
1917 }
1918 EXPORT_SYMBOL(kuc_alloc);
1919
1920 /* Takes pointer to payload area */
1921 inline void kuc_free(void *p, int payload_len)
1922 {
1923         struct kuc_hdr *lh = kuc_ptr(p);
1924         OBD_FREE(lh, kuc_len(payload_len));
1925 }
1926 EXPORT_SYMBOL(kuc_free);
1927
1928
1929