Whamcloud - gitweb
b832620a1788d5aed562dbdcabedf69163f66a94
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151 EXPORT_SYMBOL(class_get_type);
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161 EXPORT_SYMBOL(class_put_type);
162
163 #define CLASS_MAX_NAME 1024
164
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166                         struct lprocfs_seq_vars *module_vars,
167 #ifndef HAVE_ONLY_PROCFS_SEQ
168                         struct lprocfs_vars *vars,
169 #endif
170                         const char *name, struct lu_device_type *ldt)
171 {
172         struct obd_type *type;
173         int rc = 0;
174         ENTRY;
175
176         /* sanity check */
177         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178
179         if (class_search_type(name)) {
180                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
181                 RETURN(-EEXIST);
182         }
183
184         rc = -ENOMEM;
185         OBD_ALLOC(type, sizeof(*type));
186         if (type == NULL)
187                 RETURN(rc);
188
189         OBD_ALLOC_PTR(type->typ_dt_ops);
190         OBD_ALLOC_PTR(type->typ_md_ops);
191         OBD_ALLOC(type->typ_name, strlen(name) + 1);
192
193         if (type->typ_dt_ops == NULL ||
194             type->typ_md_ops == NULL ||
195             type->typ_name == NULL)
196                 GOTO (failed, rc);
197
198         *(type->typ_dt_ops) = *dt_ops;
199         /* md_ops is optional */
200         if (md_ops)
201                 *(type->typ_md_ops) = *md_ops;
202         strcpy(type->typ_name, name);
203         spin_lock_init(&type->obd_type_lock);
204
205 #ifdef LPROCFS
206 #ifndef HAVE_ONLY_PROCFS_SEQ
207         if (vars) {
208                 type->typ_procroot = lprocfs_register(type->typ_name,
209                                                         proc_lustre_root,
210                                                         vars, type);
211         } else
212 #endif
213         {
214                 type->typ_procroot = lprocfs_seq_register(type->typ_name,
215                                                         proc_lustre_root,
216                                                         module_vars, type);
217         }
218         if (IS_ERR(type->typ_procroot)) {
219                 rc = PTR_ERR(type->typ_procroot);
220                 type->typ_procroot = NULL;
221                 GOTO (failed, rc);
222         }
223 #endif
224         if (ldt != NULL) {
225                 type->typ_lu = ldt;
226                 rc = lu_device_type_init(ldt);
227                 if (rc != 0)
228                         GOTO (failed, rc);
229         }
230
231         spin_lock(&obd_types_lock);
232         cfs_list_add(&type->typ_chain, &obd_types);
233         spin_unlock(&obd_types_lock);
234
235         RETURN (0);
236
237  failed:
238         if (type->typ_name != NULL)
239                 OBD_FREE(type->typ_name, strlen(name) + 1);
240         if (type->typ_md_ops != NULL)
241                 OBD_FREE_PTR(type->typ_md_ops);
242         if (type->typ_dt_ops != NULL)
243                 OBD_FREE_PTR(type->typ_dt_ops);
244 #ifdef LPROCFS
245 #ifndef HAVE_ONLY_PROCFS_SEQ
246         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
247 #else
248         remove_proc_subtree(type->typ_name, proc_lustre_root);
249 #endif
250 #endif
251         OBD_FREE(type, sizeof(*type));
252         RETURN(rc);
253 }
254 EXPORT_SYMBOL(class_register_type);
255
256 int class_unregister_type(const char *name)
257 {
258         struct obd_type *type = class_search_type(name);
259         ENTRY;
260
261         if (!type) {
262                 CERROR("unknown obd type\n");
263                 RETURN(-EINVAL);
264         }
265
266         if (type->typ_refcnt) {
267                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
268                 /* This is a bad situation, let's make the best of it */
269                 /* Remove ops, but leave the name for debugging */
270                 OBD_FREE_PTR(type->typ_dt_ops);
271                 OBD_FREE_PTR(type->typ_md_ops);
272                 RETURN(-EBUSY);
273         }
274
275         /* we do not use type->typ_procroot as for compatibility purposes
276          * other modules can share names (i.e. lod can use lov entry). so
277          * we can't reference pointer as it can get invalided when another
278          * module removes the entry */
279 #ifdef LPROCFS
280 #ifndef HAVE_ONLY_PROCFS_SEQ
281         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
282 #else
283         remove_proc_subtree(type->typ_name, proc_lustre_root);
284 #endif
285 #endif
286         if (type->typ_lu)
287                 lu_device_type_fini(type->typ_lu);
288
289         spin_lock(&obd_types_lock);
290         cfs_list_del(&type->typ_chain);
291         spin_unlock(&obd_types_lock);
292         OBD_FREE(type->typ_name, strlen(name) + 1);
293         if (type->typ_dt_ops != NULL)
294                 OBD_FREE_PTR(type->typ_dt_ops);
295         if (type->typ_md_ops != NULL)
296                 OBD_FREE_PTR(type->typ_md_ops);
297         OBD_FREE(type, sizeof(*type));
298         RETURN(0);
299 } /* class_unregister_type */
300 EXPORT_SYMBOL(class_unregister_type);
301
302 /**
303  * Create a new obd device.
304  *
305  * Find an empty slot in ::obd_devs[], create a new obd device in it.
306  *
307  * \param[in] type_name obd device type string.
308  * \param[in] name      obd device name.
309  *
310  * \retval NULL if create fails, otherwise return the obd device
311  *         pointer created.
312  */
313 struct obd_device *class_newdev(const char *type_name, const char *name)
314 {
315         struct obd_device *result = NULL;
316         struct obd_device *newdev;
317         struct obd_type *type = NULL;
318         int i;
319         int new_obd_minor = 0;
320         ENTRY;
321
322         if (strlen(name) >= MAX_OBD_NAME) {
323                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
324                 RETURN(ERR_PTR(-EINVAL));
325         }
326
327         type = class_get_type(type_name);
328         if (type == NULL){
329                 CERROR("OBD: unknown type: %s\n", type_name);
330                 RETURN(ERR_PTR(-ENODEV));
331         }
332
333         newdev = obd_device_alloc();
334         if (newdev == NULL)
335                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
336
337         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
338
339         write_lock(&obd_dev_lock);
340         for (i = 0; i < class_devno_max(); i++) {
341                 struct obd_device *obd = class_num2obd(i);
342
343                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
344                         CERROR("Device %s already exists at %d, won't add\n",
345                                name, i);
346                         if (result) {
347                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
348                                          "%p obd_magic %08x != %08x\n", result,
349                                          result->obd_magic, OBD_DEVICE_MAGIC);
350                                 LASSERTF(result->obd_minor == new_obd_minor,
351                                          "%p obd_minor %d != %d\n", result,
352                                          result->obd_minor, new_obd_minor);
353
354                                 obd_devs[result->obd_minor] = NULL;
355                                 result->obd_name[0]='\0';
356                          }
357                         result = ERR_PTR(-EEXIST);
358                         break;
359                 }
360                 if (!result && !obd) {
361                         result = newdev;
362                         result->obd_minor = i;
363                         new_obd_minor = i;
364                         result->obd_type = type;
365                         strncpy(result->obd_name, name,
366                                 sizeof(result->obd_name) - 1);
367                         obd_devs[i] = result;
368                 }
369         }
370         write_unlock(&obd_dev_lock);
371
372         if (result == NULL && i >= class_devno_max()) {
373                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
374                        class_devno_max());
375                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
376         }
377
378         if (IS_ERR(result))
379                 GOTO(out, result);
380
381         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
382                result->obd_name, result);
383
384         RETURN(result);
385 out:
386         obd_device_free(newdev);
387 out_type:
388         class_put_type(type);
389         return result;
390 }
391
392 void class_release_dev(struct obd_device *obd)
393 {
394         struct obd_type *obd_type = obd->obd_type;
395
396         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
397                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
398         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
399                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
400         LASSERT(obd_type != NULL);
401
402         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
403                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
404
405         write_lock(&obd_dev_lock);
406         obd_devs[obd->obd_minor] = NULL;
407         write_unlock(&obd_dev_lock);
408         obd_device_free(obd);
409
410         class_put_type(obd_type);
411 }
412
413 int class_name2dev(const char *name)
414 {
415         int i;
416
417         if (!name)
418                 return -1;
419
420         read_lock(&obd_dev_lock);
421         for (i = 0; i < class_devno_max(); i++) {
422                 struct obd_device *obd = class_num2obd(i);
423
424                 if (obd && strcmp(name, obd->obd_name) == 0) {
425                         /* Make sure we finished attaching before we give
426                            out any references */
427                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428                         if (obd->obd_attached) {
429                                 read_unlock(&obd_dev_lock);
430                                 return i;
431                         }
432                         break;
433                 }
434         }
435         read_unlock(&obd_dev_lock);
436
437         return -1;
438 }
439 EXPORT_SYMBOL(class_name2dev);
440
441 struct obd_device *class_name2obd(const char *name)
442 {
443         int dev = class_name2dev(name);
444
445         if (dev < 0 || dev > class_devno_max())
446                 return NULL;
447         return class_num2obd(dev);
448 }
449 EXPORT_SYMBOL(class_name2obd);
450
451 int class_uuid2dev(struct obd_uuid *uuid)
452 {
453         int i;
454
455         read_lock(&obd_dev_lock);
456         for (i = 0; i < class_devno_max(); i++) {
457                 struct obd_device *obd = class_num2obd(i);
458
459                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
460                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
461                         read_unlock(&obd_dev_lock);
462                         return i;
463                 }
464         }
465         read_unlock(&obd_dev_lock);
466
467         return -1;
468 }
469 EXPORT_SYMBOL(class_uuid2dev);
470
471 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
472 {
473         int dev = class_uuid2dev(uuid);
474         if (dev < 0)
475                 return NULL;
476         return class_num2obd(dev);
477 }
478 EXPORT_SYMBOL(class_uuid2obd);
479
480 /**
481  * Get obd device from ::obd_devs[]
482  *
483  * \param num [in] array index
484  *
485  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
486  *         otherwise return the obd device there.
487  */
488 struct obd_device *class_num2obd(int num)
489 {
490         struct obd_device *obd = NULL;
491
492         if (num < class_devno_max()) {
493                 obd = obd_devs[num];
494                 if (obd == NULL)
495                         return NULL;
496
497                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
498                          "%p obd_magic %08x != %08x\n",
499                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
500                 LASSERTF(obd->obd_minor == num,
501                          "%p obd_minor %0d != %0d\n",
502                          obd, obd->obd_minor, num);
503         }
504
505         return obd;
506 }
507 EXPORT_SYMBOL(class_num2obd);
508
509 /**
510  * Get obd devices count. Device in any
511  *    state are counted
512  * \retval obd device count
513  */
514 int get_devices_count(void)
515 {
516         int index, max_index = class_devno_max(), dev_count = 0;
517
518         read_lock(&obd_dev_lock);
519         for (index = 0; index <= max_index; index++) {
520                 struct obd_device *obd = class_num2obd(index);
521                 if (obd != NULL)
522                         dev_count++;
523         }
524         read_unlock(&obd_dev_lock);
525
526         return dev_count;
527 }
528 EXPORT_SYMBOL(get_devices_count);
529
530 void class_obd_list(void)
531 {
532         char *status;
533         int i;
534
535         read_lock(&obd_dev_lock);
536         for (i = 0; i < class_devno_max(); i++) {
537                 struct obd_device *obd = class_num2obd(i);
538
539                 if (obd == NULL)
540                         continue;
541                 if (obd->obd_stopping)
542                         status = "ST";
543                 else if (obd->obd_set_up)
544                         status = "UP";
545                 else if (obd->obd_attached)
546                         status = "AT";
547                 else
548                         status = "--";
549                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
550                          i, status, obd->obd_type->typ_name,
551                          obd->obd_name, obd->obd_uuid.uuid,
552                          atomic_read(&obd->obd_refcount));
553         }
554         read_unlock(&obd_dev_lock);
555         return;
556 }
557
558 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
559    specified, then only the client with that uuid is returned,
560    otherwise any client connected to the tgt is returned. */
561 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
562                                           const char * typ_name,
563                                           struct obd_uuid *grp_uuid)
564 {
565         int i;
566
567         read_lock(&obd_dev_lock);
568         for (i = 0; i < class_devno_max(); i++) {
569                 struct obd_device *obd = class_num2obd(i);
570
571                 if (obd == NULL)
572                         continue;
573                 if ((strncmp(obd->obd_type->typ_name, typ_name,
574                              strlen(typ_name)) == 0)) {
575                         if (obd_uuid_equals(tgt_uuid,
576                                             &obd->u.cli.cl_target_uuid) &&
577                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
578                                                          &obd->obd_uuid) : 1)) {
579                                 read_unlock(&obd_dev_lock);
580                                 return obd;
581                         }
582                 }
583         }
584         read_unlock(&obd_dev_lock);
585
586         return NULL;
587 }
588 EXPORT_SYMBOL(class_find_client_obd);
589
590 /* Iterate the obd_device list looking devices have grp_uuid. Start
591    searching at *next, and if a device is found, the next index to look
592    at is saved in *next. If next is NULL, then the first matching device
593    will always be returned. */
594 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
595 {
596         int i;
597
598         if (next == NULL)
599                 i = 0;
600         else if (*next >= 0 && *next < class_devno_max())
601                 i = *next;
602         else
603                 return NULL;
604
605         read_lock(&obd_dev_lock);
606         for (; i < class_devno_max(); i++) {
607                 struct obd_device *obd = class_num2obd(i);
608
609                 if (obd == NULL)
610                         continue;
611                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
612                         if (next != NULL)
613                                 *next = i+1;
614                         read_unlock(&obd_dev_lock);
615                         return obd;
616                 }
617         }
618         read_unlock(&obd_dev_lock);
619
620         return NULL;
621 }
622 EXPORT_SYMBOL(class_devices_in_group);
623
624 /**
625  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
626  * adjust sptlrpc settings accordingly.
627  */
628 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
629 {
630         struct obd_device  *obd;
631         const char         *type;
632         int                 i, rc = 0, rc2;
633
634         LASSERT(namelen > 0);
635
636         read_lock(&obd_dev_lock);
637         for (i = 0; i < class_devno_max(); i++) {
638                 obd = class_num2obd(i);
639
640                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
641                         continue;
642
643                 /* only notify mdc, osc, mdt, ost */
644                 type = obd->obd_type->typ_name;
645                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
646                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
647                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
648                     strcmp(type, LUSTRE_OST_NAME) != 0)
649                         continue;
650
651                 if (strncmp(obd->obd_name, fsname, namelen))
652                         continue;
653
654                 class_incref(obd, __FUNCTION__, obd);
655                 read_unlock(&obd_dev_lock);
656                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
657                                          sizeof(KEY_SPTLRPC_CONF),
658                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
659                 rc = rc ? rc : rc2;
660                 class_decref(obd, __FUNCTION__, obd);
661                 read_lock(&obd_dev_lock);
662         }
663         read_unlock(&obd_dev_lock);
664         return rc;
665 }
666 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
667
668 void obd_cleanup_caches(void)
669 {
670         ENTRY;
671         if (obd_device_cachep) {
672                 kmem_cache_destroy(obd_device_cachep);
673                 obd_device_cachep = NULL;
674         }
675         if (obdo_cachep) {
676                 kmem_cache_destroy(obdo_cachep);
677                 obdo_cachep = NULL;
678         }
679         if (import_cachep) {
680                 kmem_cache_destroy(import_cachep);
681                 import_cachep = NULL;
682         }
683         if (capa_cachep) {
684                 kmem_cache_destroy(capa_cachep);
685                 capa_cachep = NULL;
686         }
687         EXIT;
688 }
689
690 int obd_init_caches(void)
691 {
692         int rc;
693         ENTRY;
694
695         LASSERT(obd_device_cachep == NULL);
696         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
697                                               sizeof(struct obd_device),
698                                               0, 0, NULL);
699         if (!obd_device_cachep)
700                 GOTO(out, rc = -ENOMEM);
701
702         LASSERT(obdo_cachep == NULL);
703         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
704                                         0, 0, NULL);
705         if (!obdo_cachep)
706                 GOTO(out, rc = -ENOMEM);
707
708         LASSERT(import_cachep == NULL);
709         import_cachep = kmem_cache_create("ll_import_cache",
710                                           sizeof(struct obd_import),
711                                           0, 0, NULL);
712         if (!import_cachep)
713                 GOTO(out, rc = -ENOMEM);
714
715         LASSERT(capa_cachep == NULL);
716         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
717                                         0, 0, NULL);
718         if (!capa_cachep)
719                 GOTO(out, rc = -ENOMEM);
720
721         RETURN(0);
722 out:
723         obd_cleanup_caches();
724         RETURN(rc);
725 }
726
727 /* map connection to client */
728 struct obd_export *class_conn2export(struct lustre_handle *conn)
729 {
730         struct obd_export *export;
731         ENTRY;
732
733         if (!conn) {
734                 CDEBUG(D_CACHE, "looking for null handle\n");
735                 RETURN(NULL);
736         }
737
738         if (conn->cookie == -1) {  /* this means assign a new connection */
739                 CDEBUG(D_CACHE, "want a new connection\n");
740                 RETURN(NULL);
741         }
742
743         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
744         export = class_handle2object(conn->cookie, NULL);
745         RETURN(export);
746 }
747 EXPORT_SYMBOL(class_conn2export);
748
749 struct obd_device *class_exp2obd(struct obd_export *exp)
750 {
751         if (exp)
752                 return exp->exp_obd;
753         return NULL;
754 }
755 EXPORT_SYMBOL(class_exp2obd);
756
757 struct obd_device *class_conn2obd(struct lustre_handle *conn)
758 {
759         struct obd_export *export;
760         export = class_conn2export(conn);
761         if (export) {
762                 struct obd_device *obd = export->exp_obd;
763                 class_export_put(export);
764                 return obd;
765         }
766         return NULL;
767 }
768 EXPORT_SYMBOL(class_conn2obd);
769
770 struct obd_import *class_exp2cliimp(struct obd_export *exp)
771 {
772         struct obd_device *obd = exp->exp_obd;
773         if (obd == NULL)
774                 return NULL;
775         return obd->u.cli.cl_import;
776 }
777 EXPORT_SYMBOL(class_exp2cliimp);
778
779 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
780 {
781         struct obd_device *obd = class_conn2obd(conn);
782         if (obd == NULL)
783                 return NULL;
784         return obd->u.cli.cl_import;
785 }
786 EXPORT_SYMBOL(class_conn2cliimp);
787
788 /* Export management functions */
789 static void class_export_destroy(struct obd_export *exp)
790 {
791         struct obd_device *obd = exp->exp_obd;
792         ENTRY;
793
794         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
795         LASSERT(obd != NULL);
796
797         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
798                exp->exp_client_uuid.uuid, obd->obd_name);
799
800         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
801         if (exp->exp_connection)
802                 ptlrpc_put_connection_superhack(exp->exp_connection);
803
804         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
805         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
806         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
807         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
808         obd_destroy_export(exp);
809         class_decref(obd, "export", exp);
810
811         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
812         EXIT;
813 }
814
815 static void export_handle_addref(void *export)
816 {
817         class_export_get(export);
818 }
819
820 static struct portals_handle_ops export_handle_ops = {
821         .hop_addref = export_handle_addref,
822         .hop_free   = NULL,
823 };
824
825 struct obd_export *class_export_get(struct obd_export *exp)
826 {
827         atomic_inc(&exp->exp_refcount);
828         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
829                atomic_read(&exp->exp_refcount));
830         return exp;
831 }
832 EXPORT_SYMBOL(class_export_get);
833
834 void class_export_put(struct obd_export *exp)
835 {
836         LASSERT(exp != NULL);
837         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
838         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
839                atomic_read(&exp->exp_refcount) - 1);
840
841         if (atomic_dec_and_test(&exp->exp_refcount)) {
842                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
843                 CDEBUG(D_IOCTL, "final put %p/%s\n",
844                        exp, exp->exp_client_uuid.uuid);
845
846                 /* release nid stat refererence */
847                 lprocfs_exp_cleanup(exp);
848
849                 obd_zombie_export_add(exp);
850         }
851 }
852 EXPORT_SYMBOL(class_export_put);
853
854 /* Creates a new export, adds it to the hash table, and returns a
855  * pointer to it. The refcount is 2: one for the hash reference, and
856  * one for the pointer returned by this function. */
857 struct obd_export *class_new_export(struct obd_device *obd,
858                                     struct obd_uuid *cluuid)
859 {
860         struct obd_export *export;
861         cfs_hash_t *hash = NULL;
862         int rc = 0;
863         ENTRY;
864
865         OBD_ALLOC_PTR(export);
866         if (!export)
867                 return ERR_PTR(-ENOMEM);
868
869         export->exp_conn_cnt = 0;
870         export->exp_lock_hash = NULL;
871         export->exp_flock_hash = NULL;
872         atomic_set(&export->exp_refcount, 2);
873         atomic_set(&export->exp_rpc_count, 0);
874         atomic_set(&export->exp_cb_count, 0);
875         atomic_set(&export->exp_locks_count, 0);
876 #if LUSTRE_TRACKS_LOCK_EXP_REFS
877         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
878         spin_lock_init(&export->exp_locks_list_guard);
879 #endif
880         atomic_set(&export->exp_replay_count, 0);
881         export->exp_obd = obd;
882         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
883         spin_lock_init(&export->exp_uncommitted_replies_lock);
884         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
885         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
886         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
887         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
888         CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
889         class_handle_hash(&export->exp_handle, &export_handle_ops);
890         export->exp_last_request_time = cfs_time_current_sec();
891         spin_lock_init(&export->exp_lock);
892         spin_lock_init(&export->exp_rpc_lock);
893         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
894         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
895         spin_lock_init(&export->exp_bl_list_lock);
896         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
897
898         export->exp_sp_peer = LUSTRE_SP_ANY;
899         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
900         export->exp_client_uuid = *cluuid;
901         obd_init_export(export);
902
903         spin_lock(&obd->obd_dev_lock);
904         /* shouldn't happen, but might race */
905         if (obd->obd_stopping)
906                 GOTO(exit_unlock, rc = -ENODEV);
907
908         hash = cfs_hash_getref(obd->obd_uuid_hash);
909         if (hash == NULL)
910                 GOTO(exit_unlock, rc = -ENODEV);
911         spin_unlock(&obd->obd_dev_lock);
912
913         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
914                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
915                 if (rc != 0) {
916                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
917                                       obd->obd_name, cluuid->uuid, rc);
918                         GOTO(exit_err, rc = -EALREADY);
919                 }
920         }
921
922         spin_lock(&obd->obd_dev_lock);
923         if (obd->obd_stopping) {
924                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
925                 GOTO(exit_unlock, rc = -ENODEV);
926         }
927
928         class_incref(obd, "export", export);
929         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
930         cfs_list_add_tail(&export->exp_obd_chain_timed,
931                           &export->exp_obd->obd_exports_timed);
932         export->exp_obd->obd_num_exports++;
933         spin_unlock(&obd->obd_dev_lock);
934         cfs_hash_putref(hash);
935         RETURN(export);
936
937 exit_unlock:
938         spin_unlock(&obd->obd_dev_lock);
939 exit_err:
940         if (hash)
941                 cfs_hash_putref(hash);
942         class_handle_unhash(&export->exp_handle);
943         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
944         obd_destroy_export(export);
945         OBD_FREE_PTR(export);
946         return ERR_PTR(rc);
947 }
948 EXPORT_SYMBOL(class_new_export);
949
950 void class_unlink_export(struct obd_export *exp)
951 {
952         class_handle_unhash(&exp->exp_handle);
953
954         spin_lock(&exp->exp_obd->obd_dev_lock);
955         /* delete an uuid-export hashitem from hashtables */
956         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
957                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
958                              &exp->exp_client_uuid,
959                              &exp->exp_uuid_hash);
960
961         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
962         cfs_list_del_init(&exp->exp_obd_chain_timed);
963         exp->exp_obd->obd_num_exports--;
964         spin_unlock(&exp->exp_obd->obd_dev_lock);
965         class_export_put(exp);
966 }
967 EXPORT_SYMBOL(class_unlink_export);
968
969 /* Import management functions */
970 void class_import_destroy(struct obd_import *imp)
971 {
972         ENTRY;
973
974         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
975                 imp->imp_obd->obd_name);
976
977         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
978
979         ptlrpc_put_connection_superhack(imp->imp_connection);
980
981         while (!cfs_list_empty(&imp->imp_conn_list)) {
982                 struct obd_import_conn *imp_conn;
983
984                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
985                                           struct obd_import_conn, oic_item);
986                 cfs_list_del_init(&imp_conn->oic_item);
987                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
988                 OBD_FREE(imp_conn, sizeof(*imp_conn));
989         }
990
991         LASSERT(imp->imp_sec == NULL);
992         class_decref(imp->imp_obd, "import", imp);
993         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
994         EXIT;
995 }
996
997 static void import_handle_addref(void *import)
998 {
999         class_import_get(import);
1000 }
1001
1002 static struct portals_handle_ops import_handle_ops = {
1003         .hop_addref = import_handle_addref,
1004         .hop_free   = NULL,
1005 };
1006
1007 struct obd_import *class_import_get(struct obd_import *import)
1008 {
1009         atomic_inc(&import->imp_refcount);
1010         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1011                atomic_read(&import->imp_refcount),
1012                import->imp_obd->obd_name);
1013         return import;
1014 }
1015 EXPORT_SYMBOL(class_import_get);
1016
1017 void class_import_put(struct obd_import *imp)
1018 {
1019         ENTRY;
1020
1021         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1022         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1023
1024         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1025                atomic_read(&imp->imp_refcount) - 1,
1026                imp->imp_obd->obd_name);
1027
1028         if (atomic_dec_and_test(&imp->imp_refcount)) {
1029                 CDEBUG(D_INFO, "final put import %p\n", imp);
1030                 obd_zombie_import_add(imp);
1031         }
1032
1033         /* catch possible import put race */
1034         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1035         EXIT;
1036 }
1037 EXPORT_SYMBOL(class_import_put);
1038
1039 static void init_imp_at(struct imp_at *at) {
1040         int i;
1041         at_init(&at->iat_net_latency, 0, 0);
1042         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1043                 /* max service estimates are tracked on the server side, so
1044                    don't use the AT history here, just use the last reported
1045                    val. (But keep hist for proc histogram, worst_ever) */
1046                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1047                         AT_FLG_NOHIST);
1048         }
1049 }
1050
1051 struct obd_import *class_new_import(struct obd_device *obd)
1052 {
1053         struct obd_import *imp;
1054
1055         OBD_ALLOC(imp, sizeof(*imp));
1056         if (imp == NULL)
1057                 return NULL;
1058
1059         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1060         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1061         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1062         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1063         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1064         CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1065         imp->imp_replay_cursor = &imp->imp_committed_list;
1066         spin_lock_init(&imp->imp_lock);
1067         imp->imp_last_success_conn = 0;
1068         imp->imp_state = LUSTRE_IMP_NEW;
1069         imp->imp_obd = class_incref(obd, "import", imp);
1070         mutex_init(&imp->imp_sec_mutex);
1071         init_waitqueue_head(&imp->imp_recovery_waitq);
1072
1073         atomic_set(&imp->imp_refcount, 2);
1074         atomic_set(&imp->imp_unregistering, 0);
1075         atomic_set(&imp->imp_inflight, 0);
1076         atomic_set(&imp->imp_replay_inflight, 0);
1077         atomic_set(&imp->imp_inval_count, 0);
1078         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1079         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1080         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1081         init_imp_at(&imp->imp_at);
1082
1083         /* the default magic is V2, will be used in connect RPC, and
1084          * then adjusted according to the flags in request/reply. */
1085         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1086
1087         return imp;
1088 }
1089 EXPORT_SYMBOL(class_new_import);
1090
1091 void class_destroy_import(struct obd_import *import)
1092 {
1093         LASSERT(import != NULL);
1094         LASSERT(import != LP_POISON);
1095
1096         class_handle_unhash(&import->imp_handle);
1097
1098         spin_lock(&import->imp_lock);
1099         import->imp_generation++;
1100         spin_unlock(&import->imp_lock);
1101         class_import_put(import);
1102 }
1103 EXPORT_SYMBOL(class_destroy_import);
1104
1105 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1106
1107 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1108 {
1109         spin_lock(&exp->exp_locks_list_guard);
1110
1111         LASSERT(lock->l_exp_refs_nr >= 0);
1112
1113         if (lock->l_exp_refs_target != NULL &&
1114             lock->l_exp_refs_target != exp) {
1115                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1116                               exp, lock, lock->l_exp_refs_target);
1117         }
1118         if ((lock->l_exp_refs_nr ++) == 0) {
1119                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1120                 lock->l_exp_refs_target = exp;
1121         }
1122         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1123                lock, exp, lock->l_exp_refs_nr);
1124         spin_unlock(&exp->exp_locks_list_guard);
1125 }
1126 EXPORT_SYMBOL(__class_export_add_lock_ref);
1127
1128 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1129 {
1130         spin_lock(&exp->exp_locks_list_guard);
1131         LASSERT(lock->l_exp_refs_nr > 0);
1132         if (lock->l_exp_refs_target != exp) {
1133                 LCONSOLE_WARN("lock %p, "
1134                               "mismatching export pointers: %p, %p\n",
1135                               lock, lock->l_exp_refs_target, exp);
1136         }
1137         if (-- lock->l_exp_refs_nr == 0) {
1138                 cfs_list_del_init(&lock->l_exp_refs_link);
1139                 lock->l_exp_refs_target = NULL;
1140         }
1141         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1142                lock, exp, lock->l_exp_refs_nr);
1143         spin_unlock(&exp->exp_locks_list_guard);
1144 }
1145 EXPORT_SYMBOL(__class_export_del_lock_ref);
1146 #endif
1147
1148 /* A connection defines an export context in which preallocation can
1149    be managed. This releases the export pointer reference, and returns
1150    the export handle, so the export refcount is 1 when this function
1151    returns. */
1152 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1153                   struct obd_uuid *cluuid)
1154 {
1155         struct obd_export *export;
1156         LASSERT(conn != NULL);
1157         LASSERT(obd != NULL);
1158         LASSERT(cluuid != NULL);
1159         ENTRY;
1160
1161         export = class_new_export(obd, cluuid);
1162         if (IS_ERR(export))
1163                 RETURN(PTR_ERR(export));
1164
1165         conn->cookie = export->exp_handle.h_cookie;
1166         class_export_put(export);
1167
1168         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1169                cluuid->uuid, conn->cookie);
1170         RETURN(0);
1171 }
1172 EXPORT_SYMBOL(class_connect);
1173
1174 /* if export is involved in recovery then clean up related things */
1175 void class_export_recovery_cleanup(struct obd_export *exp)
1176 {
1177         struct obd_device *obd = exp->exp_obd;
1178
1179         spin_lock(&obd->obd_recovery_task_lock);
1180         if (exp->exp_delayed)
1181                 obd->obd_delayed_clients--;
1182         if (obd->obd_recovering) {
1183                 if (exp->exp_in_recovery) {
1184                         spin_lock(&exp->exp_lock);
1185                         exp->exp_in_recovery = 0;
1186                         spin_unlock(&exp->exp_lock);
1187                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1188                         atomic_dec(&obd->obd_connected_clients);
1189                 }
1190
1191                 /* if called during recovery then should update
1192                  * obd_stale_clients counter,
1193                  * lightweight exports are not counted */
1194                 if (exp->exp_failed &&
1195                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1196                         exp->exp_obd->obd_stale_clients++;
1197         }
1198         spin_unlock(&obd->obd_recovery_task_lock);
1199         /** Cleanup req replay fields */
1200         if (exp->exp_req_replay_needed) {
1201                 spin_lock(&exp->exp_lock);
1202                 exp->exp_req_replay_needed = 0;
1203                 spin_unlock(&exp->exp_lock);
1204                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1205                 atomic_dec(&obd->obd_req_replay_clients);
1206         }
1207         /** Cleanup lock replay data */
1208         if (exp->exp_lock_replay_needed) {
1209                 spin_lock(&exp->exp_lock);
1210                 exp->exp_lock_replay_needed = 0;
1211                 spin_unlock(&exp->exp_lock);
1212                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1213                 atomic_dec(&obd->obd_lock_replay_clients);
1214         }
1215 }
1216
1217 /* This function removes 1-3 references from the export:
1218  * 1 - for export pointer passed
1219  * and if disconnect really need
1220  * 2 - removing from hash
1221  * 3 - in client_unlink_export
1222  * The export pointer passed to this function can destroyed */
1223 int class_disconnect(struct obd_export *export)
1224 {
1225         int already_disconnected;
1226         ENTRY;
1227
1228         if (export == NULL) {
1229                 CWARN("attempting to free NULL export %p\n", export);
1230                 RETURN(-EINVAL);
1231         }
1232
1233         spin_lock(&export->exp_lock);
1234         already_disconnected = export->exp_disconnected;
1235         export->exp_disconnected = 1;
1236         spin_unlock(&export->exp_lock);
1237
1238         /* class_cleanup(), abort_recovery(), and class_fail_export()
1239          * all end up in here, and if any of them race we shouldn't
1240          * call extra class_export_puts(). */
1241         if (already_disconnected) {
1242                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1243                 GOTO(no_disconn, already_disconnected);
1244         }
1245
1246         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1247                export->exp_handle.h_cookie);
1248
1249         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1250                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1251                              &export->exp_connection->c_peer.nid,
1252                              &export->exp_nid_hash);
1253
1254         class_export_recovery_cleanup(export);
1255         class_unlink_export(export);
1256 no_disconn:
1257         class_export_put(export);
1258         RETURN(0);
1259 }
1260 EXPORT_SYMBOL(class_disconnect);
1261
1262 /* Return non-zero for a fully connected export */
1263 int class_connected_export(struct obd_export *exp)
1264 {
1265         int connected = 0;
1266
1267         if (exp) {
1268                 spin_lock(&exp->exp_lock);
1269                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1270                 spin_unlock(&exp->exp_lock);
1271         }
1272         return connected;
1273 }
1274 EXPORT_SYMBOL(class_connected_export);
1275
1276 static void class_disconnect_export_list(cfs_list_t *list,
1277                                          enum obd_option flags)
1278 {
1279         int rc;
1280         struct obd_export *exp;
1281         ENTRY;
1282
1283         /* It's possible that an export may disconnect itself, but
1284          * nothing else will be added to this list. */
1285         while (!cfs_list_empty(list)) {
1286                 exp = cfs_list_entry(list->next, struct obd_export,
1287                                      exp_obd_chain);
1288                 /* need for safe call CDEBUG after obd_disconnect */
1289                 class_export_get(exp);
1290
1291                 spin_lock(&exp->exp_lock);
1292                 exp->exp_flags = flags;
1293                 spin_unlock(&exp->exp_lock);
1294
1295                 if (obd_uuid_equals(&exp->exp_client_uuid,
1296                                     &exp->exp_obd->obd_uuid)) {
1297                         CDEBUG(D_HA,
1298                                "exp %p export uuid == obd uuid, don't discon\n",
1299                                exp);
1300                         /* Need to delete this now so we don't end up pointing
1301                          * to work_list later when this export is cleaned up. */
1302                         cfs_list_del_init(&exp->exp_obd_chain);
1303                         class_export_put(exp);
1304                         continue;
1305                 }
1306
1307                 class_export_get(exp);
1308                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1309                        "last request at "CFS_TIME_T"\n",
1310                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1311                        exp, exp->exp_last_request_time);
1312                 /* release one export reference anyway */
1313                 rc = obd_disconnect(exp);
1314
1315                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1316                        obd_export_nid2str(exp), exp, rc);
1317                 class_export_put(exp);
1318         }
1319         EXIT;
1320 }
1321
1322 void class_disconnect_exports(struct obd_device *obd)
1323 {
1324         cfs_list_t work_list;
1325         ENTRY;
1326
1327         /* Move all of the exports from obd_exports to a work list, en masse. */
1328         CFS_INIT_LIST_HEAD(&work_list);
1329         spin_lock(&obd->obd_dev_lock);
1330         cfs_list_splice_init(&obd->obd_exports, &work_list);
1331         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1332         spin_unlock(&obd->obd_dev_lock);
1333
1334         if (!cfs_list_empty(&work_list)) {
1335                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1336                        "disconnecting them\n", obd->obd_minor, obd);
1337                 class_disconnect_export_list(&work_list,
1338                                              exp_flags_from_obd(obd));
1339         } else
1340                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1341                        obd->obd_minor, obd);
1342         EXIT;
1343 }
1344 EXPORT_SYMBOL(class_disconnect_exports);
1345
1346 /* Remove exports that have not completed recovery.
1347  */
1348 void class_disconnect_stale_exports(struct obd_device *obd,
1349                                     int (*test_export)(struct obd_export *))
1350 {
1351         cfs_list_t work_list;
1352         struct obd_export *exp, *n;
1353         int evicted = 0;
1354         ENTRY;
1355
1356         CFS_INIT_LIST_HEAD(&work_list);
1357         spin_lock(&obd->obd_dev_lock);
1358         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1359                                      exp_obd_chain) {
1360                 /* don't count self-export as client */
1361                 if (obd_uuid_equals(&exp->exp_client_uuid,
1362                                     &exp->exp_obd->obd_uuid))
1363                         continue;
1364
1365                 /* don't evict clients which have no slot in last_rcvd
1366                  * (e.g. lightweight connection) */
1367                 if (exp->exp_target_data.ted_lr_idx == -1)
1368                         continue;
1369
1370                 spin_lock(&exp->exp_lock);
1371                 if (exp->exp_failed || test_export(exp)) {
1372                         spin_unlock(&exp->exp_lock);
1373                         continue;
1374                 }
1375                 exp->exp_failed = 1;
1376                 spin_unlock(&exp->exp_lock);
1377
1378                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1379                 evicted++;
1380                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1381                        obd->obd_name, exp->exp_client_uuid.uuid,
1382                        exp->exp_connection == NULL ? "<unknown>" :
1383                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1384                 print_export_data(exp, "EVICTING", 0);
1385         }
1386         spin_unlock(&obd->obd_dev_lock);
1387
1388         if (evicted)
1389                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1390                               obd->obd_name, evicted);
1391
1392         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1393                                                  OBD_OPT_ABORT_RECOV);
1394         EXIT;
1395 }
1396 EXPORT_SYMBOL(class_disconnect_stale_exports);
1397
1398 void class_fail_export(struct obd_export *exp)
1399 {
1400         int rc, already_failed;
1401
1402         spin_lock(&exp->exp_lock);
1403         already_failed = exp->exp_failed;
1404         exp->exp_failed = 1;
1405         spin_unlock(&exp->exp_lock);
1406
1407         if (already_failed) {
1408                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1409                        exp, exp->exp_client_uuid.uuid);
1410                 return;
1411         }
1412
1413         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1414                exp, exp->exp_client_uuid.uuid);
1415
1416         if (obd_dump_on_timeout)
1417                 libcfs_debug_dumplog();
1418
1419         /* need for safe call CDEBUG after obd_disconnect */
1420         class_export_get(exp);
1421
1422         /* Most callers into obd_disconnect are removing their own reference
1423          * (request, for example) in addition to the one from the hash table.
1424          * We don't have such a reference here, so make one. */
1425         class_export_get(exp);
1426         rc = obd_disconnect(exp);
1427         if (rc)
1428                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1429         else
1430                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1431                        exp, exp->exp_client_uuid.uuid);
1432         class_export_put(exp);
1433 }
1434 EXPORT_SYMBOL(class_fail_export);
1435
1436 char *obd_export_nid2str(struct obd_export *exp)
1437 {
1438         if (exp->exp_connection != NULL)
1439                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1440
1441         return "(no nid)";
1442 }
1443 EXPORT_SYMBOL(obd_export_nid2str);
1444
1445 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1446 {
1447         cfs_hash_t *nid_hash;
1448         struct obd_export *doomed_exp = NULL;
1449         int exports_evicted = 0;
1450
1451         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1452
1453         spin_lock(&obd->obd_dev_lock);
1454         /* umount has run already, so evict thread should leave
1455          * its task to umount thread now */
1456         if (obd->obd_stopping) {
1457                 spin_unlock(&obd->obd_dev_lock);
1458                 return exports_evicted;
1459         }
1460         nid_hash = obd->obd_nid_hash;
1461         cfs_hash_getref(nid_hash);
1462         spin_unlock(&obd->obd_dev_lock);
1463
1464         do {
1465                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1466                 if (doomed_exp == NULL)
1467                         break;
1468
1469                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1470                          "nid %s found, wanted nid %s, requested nid %s\n",
1471                          obd_export_nid2str(doomed_exp),
1472                          libcfs_nid2str(nid_key), nid);
1473                 LASSERTF(doomed_exp != obd->obd_self_export,
1474                          "self-export is hashed by NID?\n");
1475                 exports_evicted++;
1476                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1477                               "request\n", obd->obd_name,
1478                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1479                               obd_export_nid2str(doomed_exp));
1480                 class_fail_export(doomed_exp);
1481                 class_export_put(doomed_exp);
1482         } while (1);
1483
1484         cfs_hash_putref(nid_hash);
1485
1486         if (!exports_evicted)
1487                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1488                        obd->obd_name, nid);
1489         return exports_evicted;
1490 }
1491 EXPORT_SYMBOL(obd_export_evict_by_nid);
1492
1493 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1494 {
1495         cfs_hash_t *uuid_hash;
1496         struct obd_export *doomed_exp = NULL;
1497         struct obd_uuid doomed_uuid;
1498         int exports_evicted = 0;
1499
1500         spin_lock(&obd->obd_dev_lock);
1501         if (obd->obd_stopping) {
1502                 spin_unlock(&obd->obd_dev_lock);
1503                 return exports_evicted;
1504         }
1505         uuid_hash = obd->obd_uuid_hash;
1506         cfs_hash_getref(uuid_hash);
1507         spin_unlock(&obd->obd_dev_lock);
1508
1509         obd_str2uuid(&doomed_uuid, uuid);
1510         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1511                 CERROR("%s: can't evict myself\n", obd->obd_name);
1512                 cfs_hash_putref(uuid_hash);
1513                 return exports_evicted;
1514         }
1515
1516         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1517
1518         if (doomed_exp == NULL) {
1519                 CERROR("%s: can't disconnect %s: no exports found\n",
1520                        obd->obd_name, uuid);
1521         } else {
1522                 CWARN("%s: evicting %s at adminstrative request\n",
1523                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1524                 class_fail_export(doomed_exp);
1525                 class_export_put(doomed_exp);
1526                 exports_evicted++;
1527         }
1528         cfs_hash_putref(uuid_hash);
1529
1530         return exports_evicted;
1531 }
1532 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1533
1534 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1535 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1536 EXPORT_SYMBOL(class_export_dump_hook);
1537 #endif
1538
1539 static void print_export_data(struct obd_export *exp, const char *status,
1540                               int locks)
1541 {
1542         struct ptlrpc_reply_state *rs;
1543         struct ptlrpc_reply_state *first_reply = NULL;
1544         int nreplies = 0;
1545
1546         spin_lock(&exp->exp_lock);
1547         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1548                                 rs_exp_list) {
1549                 if (nreplies == 0)
1550                         first_reply = rs;
1551                 nreplies++;
1552         }
1553         spin_unlock(&exp->exp_lock);
1554
1555         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1556                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1557                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1558                atomic_read(&exp->exp_rpc_count),
1559                atomic_read(&exp->exp_cb_count),
1560                atomic_read(&exp->exp_locks_count),
1561                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1562                nreplies, first_reply, nreplies > 3 ? "..." : "",
1563                exp->exp_last_committed);
1564 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1565         if (locks && class_export_dump_hook != NULL)
1566                 class_export_dump_hook(exp);
1567 #endif
1568 }
1569
1570 void dump_exports(struct obd_device *obd, int locks)
1571 {
1572         struct obd_export *exp;
1573
1574         spin_lock(&obd->obd_dev_lock);
1575         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1576                 print_export_data(exp, "ACTIVE", locks);
1577         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1578                 print_export_data(exp, "UNLINKED", locks);
1579         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1580                 print_export_data(exp, "DELAYED", locks);
1581         spin_unlock(&obd->obd_dev_lock);
1582         spin_lock(&obd_zombie_impexp_lock);
1583         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1584                 print_export_data(exp, "ZOMBIE", locks);
1585         spin_unlock(&obd_zombie_impexp_lock);
1586 }
1587 EXPORT_SYMBOL(dump_exports);
1588
1589 void obd_exports_barrier(struct obd_device *obd)
1590 {
1591         int waited = 2;
1592         LASSERT(cfs_list_empty(&obd->obd_exports));
1593         spin_lock(&obd->obd_dev_lock);
1594         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1595                 spin_unlock(&obd->obd_dev_lock);
1596                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1597                                                    cfs_time_seconds(waited));
1598                 if (waited > 5 && IS_PO2(waited)) {
1599                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1600                                       "more than %d seconds. "
1601                                       "The obd refcount = %d. Is it stuck?\n",
1602                                       obd->obd_name, waited,
1603                                       atomic_read(&obd->obd_refcount));
1604                         dump_exports(obd, 1);
1605                 }
1606                 waited *= 2;
1607                 spin_lock(&obd->obd_dev_lock);
1608         }
1609         spin_unlock(&obd->obd_dev_lock);
1610 }
1611 EXPORT_SYMBOL(obd_exports_barrier);
1612
1613 /* Total amount of zombies to be destroyed */
1614 static int zombies_count = 0;
1615
1616 /**
1617  * kill zombie imports and exports
1618  */
1619 void obd_zombie_impexp_cull(void)
1620 {
1621         struct obd_import *import;
1622         struct obd_export *export;
1623         ENTRY;
1624
1625         do {
1626                 spin_lock(&obd_zombie_impexp_lock);
1627
1628                 import = NULL;
1629                 if (!cfs_list_empty(&obd_zombie_imports)) {
1630                         import = cfs_list_entry(obd_zombie_imports.next,
1631                                                 struct obd_import,
1632                                                 imp_zombie_chain);
1633                         cfs_list_del_init(&import->imp_zombie_chain);
1634                 }
1635
1636                 export = NULL;
1637                 if (!cfs_list_empty(&obd_zombie_exports)) {
1638                         export = cfs_list_entry(obd_zombie_exports.next,
1639                                                 struct obd_export,
1640                                                 exp_obd_chain);
1641                         cfs_list_del_init(&export->exp_obd_chain);
1642                 }
1643
1644                 spin_unlock(&obd_zombie_impexp_lock);
1645
1646                 if (import != NULL) {
1647                         class_import_destroy(import);
1648                         spin_lock(&obd_zombie_impexp_lock);
1649                         zombies_count--;
1650                         spin_unlock(&obd_zombie_impexp_lock);
1651                 }
1652
1653                 if (export != NULL) {
1654                         class_export_destroy(export);
1655                         spin_lock(&obd_zombie_impexp_lock);
1656                         zombies_count--;
1657                         spin_unlock(&obd_zombie_impexp_lock);
1658                 }
1659
1660                 cond_resched();
1661         } while (import != NULL || export != NULL);
1662         EXIT;
1663 }
1664
1665 static struct completion        obd_zombie_start;
1666 static struct completion        obd_zombie_stop;
1667 static unsigned long            obd_zombie_flags;
1668 static wait_queue_head_t        obd_zombie_waitq;
1669 static pid_t                    obd_zombie_pid;
1670
1671 enum {
1672         OBD_ZOMBIE_STOP         = 0x0001,
1673 };
1674
1675 /**
1676  * check for work for kill zombie import/export thread.
1677  */
1678 static int obd_zombie_impexp_check(void *arg)
1679 {
1680         int rc;
1681
1682         spin_lock(&obd_zombie_impexp_lock);
1683         rc = (zombies_count == 0) &&
1684              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1685         spin_unlock(&obd_zombie_impexp_lock);
1686
1687         RETURN(rc);
1688 }
1689
1690 /**
1691  * Add export to the obd_zombe thread and notify it.
1692  */
1693 static void obd_zombie_export_add(struct obd_export *exp) {
1694         spin_lock(&exp->exp_obd->obd_dev_lock);
1695         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1696         cfs_list_del_init(&exp->exp_obd_chain);
1697         spin_unlock(&exp->exp_obd->obd_dev_lock);
1698         spin_lock(&obd_zombie_impexp_lock);
1699         zombies_count++;
1700         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1701         spin_unlock(&obd_zombie_impexp_lock);
1702
1703         obd_zombie_impexp_notify();
1704 }
1705
1706 /**
1707  * Add import to the obd_zombe thread and notify it.
1708  */
1709 static void obd_zombie_import_add(struct obd_import *imp) {
1710         LASSERT(imp->imp_sec == NULL);
1711         LASSERT(imp->imp_rq_pool == NULL);
1712         spin_lock(&obd_zombie_impexp_lock);
1713         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1714         zombies_count++;
1715         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1716         spin_unlock(&obd_zombie_impexp_lock);
1717
1718         obd_zombie_impexp_notify();
1719 }
1720
1721 /**
1722  * notify import/export destroy thread about new zombie.
1723  */
1724 static void obd_zombie_impexp_notify(void)
1725 {
1726         /*
1727          * Make sure obd_zomebie_impexp_thread get this notification.
1728          * It is possible this signal only get by obd_zombie_barrier, and
1729          * barrier gulps this notification and sleeps away and hangs ensues
1730          */
1731         wake_up_all(&obd_zombie_waitq);
1732 }
1733
1734 /**
1735  * check whether obd_zombie is idle
1736  */
1737 static int obd_zombie_is_idle(void)
1738 {
1739         int rc;
1740
1741         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1742         spin_lock(&obd_zombie_impexp_lock);
1743         rc = (zombies_count == 0);
1744         spin_unlock(&obd_zombie_impexp_lock);
1745         return rc;
1746 }
1747
1748 /**
1749  * wait when obd_zombie import/export queues become empty
1750  */
1751 void obd_zombie_barrier(void)
1752 {
1753         struct l_wait_info lwi = { 0 };
1754
1755         if (obd_zombie_pid == current_pid())
1756                 /* don't wait for myself */
1757                 return;
1758         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1759 }
1760 EXPORT_SYMBOL(obd_zombie_barrier);
1761
1762 #ifdef __KERNEL__
1763
1764 /**
1765  * destroy zombie export/import thread.
1766  */
1767 static int obd_zombie_impexp_thread(void *unused)
1768 {
1769         unshare_fs_struct();
1770         complete(&obd_zombie_start);
1771
1772         obd_zombie_pid = current_pid();
1773
1774         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1775                 struct l_wait_info lwi = { 0 };
1776
1777                 l_wait_event(obd_zombie_waitq,
1778                              !obd_zombie_impexp_check(NULL), &lwi);
1779                 obd_zombie_impexp_cull();
1780
1781                 /*
1782                  * Notify obd_zombie_barrier callers that queues
1783                  * may be empty.
1784                  */
1785                 wake_up(&obd_zombie_waitq);
1786         }
1787
1788         complete(&obd_zombie_stop);
1789
1790         RETURN(0);
1791 }
1792
1793 #else /* ! KERNEL */
1794
1795 static atomic_t zombie_recur = ATOMIC_INIT(0);
1796 static void *obd_zombie_impexp_work_cb;
1797 static void *obd_zombie_impexp_idle_cb;
1798
1799 int obd_zombie_impexp_kill(void *arg)
1800 {
1801         int rc = 0;
1802
1803         if (atomic_inc_return(&zombie_recur) == 1) {
1804                 obd_zombie_impexp_cull();
1805                 rc = 1;
1806         }
1807         atomic_dec(&zombie_recur);
1808         return rc;
1809 }
1810
1811 #endif
1812
1813 /**
1814  * start destroy zombie import/export thread
1815  */
1816 int obd_zombie_impexp_init(void)
1817 {
1818 #ifdef __KERNEL__
1819         struct task_struct *task;
1820 #endif
1821
1822         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1823         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1824         spin_lock_init(&obd_zombie_impexp_lock);
1825         init_completion(&obd_zombie_start);
1826         init_completion(&obd_zombie_stop);
1827         init_waitqueue_head(&obd_zombie_waitq);
1828         obd_zombie_pid = 0;
1829
1830 #ifdef __KERNEL__
1831         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1832         if (IS_ERR(task))
1833                 RETURN(PTR_ERR(task));
1834
1835         wait_for_completion(&obd_zombie_start);
1836 #else
1837
1838         obd_zombie_impexp_work_cb =
1839                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1840                                                  &obd_zombie_impexp_kill, NULL);
1841
1842         obd_zombie_impexp_idle_cb =
1843                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1844                                                  &obd_zombie_impexp_check, NULL);
1845 #endif
1846         RETURN(0);
1847 }
1848 /**
1849  * stop destroy zombie import/export thread
1850  */
1851 void obd_zombie_impexp_stop(void)
1852 {
1853         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1854         obd_zombie_impexp_notify();
1855 #ifdef __KERNEL__
1856         wait_for_completion(&obd_zombie_stop);
1857 #else
1858         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1859         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1860 #endif
1861 }
1862
1863 /***** Kernel-userspace comm helpers *******/
1864
1865 /* Get length of entire message, including header */
1866 int kuc_len(int payload_len)
1867 {
1868         return sizeof(struct kuc_hdr) + payload_len;
1869 }
1870 EXPORT_SYMBOL(kuc_len);
1871
1872 /* Get a pointer to kuc header, given a ptr to the payload
1873  * @param p Pointer to payload area
1874  * @returns Pointer to kuc header
1875  */
1876 struct kuc_hdr * kuc_ptr(void *p)
1877 {
1878         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1879         LASSERT(lh->kuc_magic == KUC_MAGIC);
1880         return lh;
1881 }
1882 EXPORT_SYMBOL(kuc_ptr);
1883
1884 /* Test if payload is part of kuc message
1885  * @param p Pointer to payload area
1886  * @returns boolean
1887  */
1888 int kuc_ispayload(void *p)
1889 {
1890         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1891
1892         if (kh->kuc_magic == KUC_MAGIC)
1893                 return 1;
1894         else
1895                 return 0;
1896 }
1897 EXPORT_SYMBOL(kuc_ispayload);
1898
1899 /* Alloc space for a message, and fill in header
1900  * @return Pointer to payload area
1901  */
1902 void *kuc_alloc(int payload_len, int transport, int type)
1903 {
1904         struct kuc_hdr *lh;
1905         int len = kuc_len(payload_len);
1906
1907         OBD_ALLOC(lh, len);
1908         if (lh == NULL)
1909                 return ERR_PTR(-ENOMEM);
1910
1911         lh->kuc_magic = KUC_MAGIC;
1912         lh->kuc_transport = transport;
1913         lh->kuc_msgtype = type;
1914         lh->kuc_msglen = len;
1915
1916         return (void *)(lh + 1);
1917 }
1918 EXPORT_SYMBOL(kuc_alloc);
1919
1920 /* Takes pointer to payload area */
1921 inline void kuc_free(void *p, int payload_len)
1922 {
1923         struct kuc_hdr *lh = kuc_ptr(p);
1924         OBD_FREE(lh, kuc_len(payload_len));
1925 }
1926 EXPORT_SYMBOL(kuc_free);
1927
1928
1929