Whamcloud - gitweb
LU-3319 procfs: fix symlink handling
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
48
49 extern cfs_list_t obd_types;
50 spinlock_t obd_types_lock;
51
52 struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 struct kmem_cache *import_cachep;
56
57 cfs_list_t      obd_zombie_imports;
58 cfs_list_t      obd_zombie_exports;
59 spinlock_t  obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         cfs_list_t *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         cfs_list_for_each(tmp, &obd_types) {
105                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114 EXPORT_SYMBOL(class_search_type);
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123
124                 if (strcmp(modname, "obdfilter") == 0)
125                         modname = "ofd";
126
127                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128                         modname = LUSTRE_OSP_NAME;
129
130                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131                         modname = LUSTRE_MDT_NAME;
132
133                 if (!request_module("%s", modname)) {
134                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135                         type = class_search_type(name);
136                 } else {
137                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138                                            modname);
139                 }
140         }
141 #endif
142         if (type) {
143                 spin_lock(&type->obd_type_lock);
144                 type->typ_refcnt++;
145                 try_module_get(type->typ_dt_ops->o_owner);
146                 spin_unlock(&type->obd_type_lock);
147         }
148         return type;
149 }
150 EXPORT_SYMBOL(class_get_type);
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160 EXPORT_SYMBOL(class_put_type);
161
162 #define CLASS_MAX_NAME 1024
163
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165                         bool enable_proc, struct lprocfs_seq_vars *module_vars,
166 #ifndef HAVE_ONLY_PROCFS_SEQ
167                         struct lprocfs_vars *vars,
168 #endif
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef LPROCFS
205         if (enable_proc) {
206 #ifndef HAVE_ONLY_PROCFS_SEQ
207                 if (vars) {
208                         type->typ_procroot = lprocfs_register(type->typ_name,
209                                                               proc_lustre_root,
210                                                               vars, type);
211                 } else
212 #endif
213                 {
214                         type->typ_procroot = lprocfs_seq_register(type->typ_name,
215                                                                   proc_lustre_root,
216                                                                   module_vars, type);
217                 }
218                 if (IS_ERR(type->typ_procroot)) {
219                         rc = PTR_ERR(type->typ_procroot);
220                         type->typ_procroot = NULL;
221                         GOTO(failed, rc);
222                 }
223         }
224 #endif
225         if (ldt != NULL) {
226                 type->typ_lu = ldt;
227                 rc = lu_device_type_init(ldt);
228                 if (rc != 0)
229                         GOTO (failed, rc);
230         }
231
232         spin_lock(&obd_types_lock);
233         cfs_list_add(&type->typ_chain, &obd_types);
234         spin_unlock(&obd_types_lock);
235
236         RETURN (0);
237
238 failed:
239         if (type->typ_name != NULL) {
240 #ifdef LPROCFS
241                 if (type->typ_procroot != NULL) {
242 #ifndef HAVE_ONLY_PROCFS_SEQ
243                         lprocfs_try_remove_proc_entry(type->typ_name,
244                                                       proc_lustre_root);
245 #else
246                         remove_proc_subtree(type->typ_name, proc_lustre_root);
247 #endif
248                 }
249 #endif
250                 OBD_FREE(type->typ_name, strlen(name) + 1);
251         }
252         if (type->typ_md_ops != NULL)
253                 OBD_FREE_PTR(type->typ_md_ops);
254         if (type->typ_dt_ops != NULL)
255                 OBD_FREE_PTR(type->typ_dt_ops);
256         OBD_FREE(type, sizeof(*type));
257         RETURN(rc);
258 }
259 EXPORT_SYMBOL(class_register_type);
260
261 int class_unregister_type(const char *name)
262 {
263         struct obd_type *type = class_search_type(name);
264         ENTRY;
265
266         if (!type) {
267                 CERROR("unknown obd type\n");
268                 RETURN(-EINVAL);
269         }
270
271         if (type->typ_refcnt) {
272                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
273                 /* This is a bad situation, let's make the best of it */
274                 /* Remove ops, but leave the name for debugging */
275                 OBD_FREE_PTR(type->typ_dt_ops);
276                 OBD_FREE_PTR(type->typ_md_ops);
277                 RETURN(-EBUSY);
278         }
279
280         /* we do not use type->typ_procroot as for compatibility purposes
281          * other modules can share names (i.e. lod can use lov entry). so
282          * we can't reference pointer as it can get invalided when another
283          * module removes the entry */
284 #ifdef LPROCFS
285         if (type->typ_procroot != NULL) {
286 #ifndef HAVE_ONLY_PROCFS_SEQ
287                 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
288 #else
289                 remove_proc_subtree(type->typ_name, proc_lustre_root);
290 #endif
291         }
292
293         if (type->typ_procsym != NULL)
294                 lprocfs_remove(&type->typ_procsym);
295 #endif
296         if (type->typ_lu)
297                 lu_device_type_fini(type->typ_lu);
298
299         spin_lock(&obd_types_lock);
300         cfs_list_del(&type->typ_chain);
301         spin_unlock(&obd_types_lock);
302         OBD_FREE(type->typ_name, strlen(name) + 1);
303         if (type->typ_dt_ops != NULL)
304                 OBD_FREE_PTR(type->typ_dt_ops);
305         if (type->typ_md_ops != NULL)
306                 OBD_FREE_PTR(type->typ_md_ops);
307         OBD_FREE(type, sizeof(*type));
308         RETURN(0);
309 } /* class_unregister_type */
310 EXPORT_SYMBOL(class_unregister_type);
311
312 /**
313  * Create a new obd device.
314  *
315  * Find an empty slot in ::obd_devs[], create a new obd device in it.
316  *
317  * \param[in] type_name obd device type string.
318  * \param[in] name      obd device name.
319  *
320  * \retval NULL if create fails, otherwise return the obd device
321  *         pointer created.
322  */
323 struct obd_device *class_newdev(const char *type_name, const char *name)
324 {
325         struct obd_device *result = NULL;
326         struct obd_device *newdev;
327         struct obd_type *type = NULL;
328         int i;
329         int new_obd_minor = 0;
330         ENTRY;
331
332         if (strlen(name) >= MAX_OBD_NAME) {
333                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
334                 RETURN(ERR_PTR(-EINVAL));
335         }
336
337         type = class_get_type(type_name);
338         if (type == NULL){
339                 CERROR("OBD: unknown type: %s\n", type_name);
340                 RETURN(ERR_PTR(-ENODEV));
341         }
342
343         newdev = obd_device_alloc();
344         if (newdev == NULL)
345                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
346
347         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
348
349         write_lock(&obd_dev_lock);
350         for (i = 0; i < class_devno_max(); i++) {
351                 struct obd_device *obd = class_num2obd(i);
352
353                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
354                         CERROR("Device %s already exists at %d, won't add\n",
355                                name, i);
356                         if (result) {
357                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
358                                          "%p obd_magic %08x != %08x\n", result,
359                                          result->obd_magic, OBD_DEVICE_MAGIC);
360                                 LASSERTF(result->obd_minor == new_obd_minor,
361                                          "%p obd_minor %d != %d\n", result,
362                                          result->obd_minor, new_obd_minor);
363
364                                 obd_devs[result->obd_minor] = NULL;
365                                 result->obd_name[0]='\0';
366                          }
367                         result = ERR_PTR(-EEXIST);
368                         break;
369                 }
370                 if (!result && !obd) {
371                         result = newdev;
372                         result->obd_minor = i;
373                         new_obd_minor = i;
374                         result->obd_type = type;
375                         strncpy(result->obd_name, name,
376                                 sizeof(result->obd_name) - 1);
377                         obd_devs[i] = result;
378                 }
379         }
380         write_unlock(&obd_dev_lock);
381
382         if (result == NULL && i >= class_devno_max()) {
383                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
384                        class_devno_max());
385                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
386         }
387
388         if (IS_ERR(result))
389                 GOTO(out, result);
390
391         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
392                result->obd_name, result);
393
394         RETURN(result);
395 out:
396         obd_device_free(newdev);
397 out_type:
398         class_put_type(type);
399         return result;
400 }
401
402 void class_release_dev(struct obd_device *obd)
403 {
404         struct obd_type *obd_type = obd->obd_type;
405
406         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
407                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
408         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
409                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
410         LASSERT(obd_type != NULL);
411
412         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
413                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
414
415         write_lock(&obd_dev_lock);
416         obd_devs[obd->obd_minor] = NULL;
417         write_unlock(&obd_dev_lock);
418         obd_device_free(obd);
419
420         class_put_type(obd_type);
421 }
422
423 int class_name2dev(const char *name)
424 {
425         int i;
426
427         if (!name)
428                 return -1;
429
430         read_lock(&obd_dev_lock);
431         for (i = 0; i < class_devno_max(); i++) {
432                 struct obd_device *obd = class_num2obd(i);
433
434                 if (obd && strcmp(name, obd->obd_name) == 0) {
435                         /* Make sure we finished attaching before we give
436                            out any references */
437                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
438                         if (obd->obd_attached) {
439                                 read_unlock(&obd_dev_lock);
440                                 return i;
441                         }
442                         break;
443                 }
444         }
445         read_unlock(&obd_dev_lock);
446
447         return -1;
448 }
449 EXPORT_SYMBOL(class_name2dev);
450
451 struct obd_device *class_name2obd(const char *name)
452 {
453         int dev = class_name2dev(name);
454
455         if (dev < 0 || dev > class_devno_max())
456                 return NULL;
457         return class_num2obd(dev);
458 }
459 EXPORT_SYMBOL(class_name2obd);
460
461 int class_uuid2dev(struct obd_uuid *uuid)
462 {
463         int i;
464
465         read_lock(&obd_dev_lock);
466         for (i = 0; i < class_devno_max(); i++) {
467                 struct obd_device *obd = class_num2obd(i);
468
469                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
470                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
471                         read_unlock(&obd_dev_lock);
472                         return i;
473                 }
474         }
475         read_unlock(&obd_dev_lock);
476
477         return -1;
478 }
479 EXPORT_SYMBOL(class_uuid2dev);
480
481 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
482 {
483         int dev = class_uuid2dev(uuid);
484         if (dev < 0)
485                 return NULL;
486         return class_num2obd(dev);
487 }
488 EXPORT_SYMBOL(class_uuid2obd);
489
490 /**
491  * Get obd device from ::obd_devs[]
492  *
493  * \param num [in] array index
494  *
495  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
496  *         otherwise return the obd device there.
497  */
498 struct obd_device *class_num2obd(int num)
499 {
500         struct obd_device *obd = NULL;
501
502         if (num < class_devno_max()) {
503                 obd = obd_devs[num];
504                 if (obd == NULL)
505                         return NULL;
506
507                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
508                          "%p obd_magic %08x != %08x\n",
509                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
510                 LASSERTF(obd->obd_minor == num,
511                          "%p obd_minor %0d != %0d\n",
512                          obd, obd->obd_minor, num);
513         }
514
515         return obd;
516 }
517 EXPORT_SYMBOL(class_num2obd);
518
519 /**
520  * Get obd devices count. Device in any
521  *    state are counted
522  * \retval obd device count
523  */
524 int get_devices_count(void)
525 {
526         int index, max_index = class_devno_max(), dev_count = 0;
527
528         read_lock(&obd_dev_lock);
529         for (index = 0; index <= max_index; index++) {
530                 struct obd_device *obd = class_num2obd(index);
531                 if (obd != NULL)
532                         dev_count++;
533         }
534         read_unlock(&obd_dev_lock);
535
536         return dev_count;
537 }
538 EXPORT_SYMBOL(get_devices_count);
539
540 void class_obd_list(void)
541 {
542         char *status;
543         int i;
544
545         read_lock(&obd_dev_lock);
546         for (i = 0; i < class_devno_max(); i++) {
547                 struct obd_device *obd = class_num2obd(i);
548
549                 if (obd == NULL)
550                         continue;
551                 if (obd->obd_stopping)
552                         status = "ST";
553                 else if (obd->obd_set_up)
554                         status = "UP";
555                 else if (obd->obd_attached)
556                         status = "AT";
557                 else
558                         status = "--";
559                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
560                          i, status, obd->obd_type->typ_name,
561                          obd->obd_name, obd->obd_uuid.uuid,
562                          atomic_read(&obd->obd_refcount));
563         }
564         read_unlock(&obd_dev_lock);
565         return;
566 }
567
568 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
569    specified, then only the client with that uuid is returned,
570    otherwise any client connected to the tgt is returned. */
571 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
572                                           const char * typ_name,
573                                           struct obd_uuid *grp_uuid)
574 {
575         int i;
576
577         read_lock(&obd_dev_lock);
578         for (i = 0; i < class_devno_max(); i++) {
579                 struct obd_device *obd = class_num2obd(i);
580
581                 if (obd == NULL)
582                         continue;
583                 if ((strncmp(obd->obd_type->typ_name, typ_name,
584                              strlen(typ_name)) == 0)) {
585                         if (obd_uuid_equals(tgt_uuid,
586                                             &obd->u.cli.cl_target_uuid) &&
587                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
588                                                          &obd->obd_uuid) : 1)) {
589                                 read_unlock(&obd_dev_lock);
590                                 return obd;
591                         }
592                 }
593         }
594         read_unlock(&obd_dev_lock);
595
596         return NULL;
597 }
598 EXPORT_SYMBOL(class_find_client_obd);
599
600 /* Iterate the obd_device list looking devices have grp_uuid. Start
601    searching at *next, and if a device is found, the next index to look
602    at is saved in *next. If next is NULL, then the first matching device
603    will always be returned. */
604 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
605 {
606         int i;
607
608         if (next == NULL)
609                 i = 0;
610         else if (*next >= 0 && *next < class_devno_max())
611                 i = *next;
612         else
613                 return NULL;
614
615         read_lock(&obd_dev_lock);
616         for (; i < class_devno_max(); i++) {
617                 struct obd_device *obd = class_num2obd(i);
618
619                 if (obd == NULL)
620                         continue;
621                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
622                         if (next != NULL)
623                                 *next = i+1;
624                         read_unlock(&obd_dev_lock);
625                         return obd;
626                 }
627         }
628         read_unlock(&obd_dev_lock);
629
630         return NULL;
631 }
632 EXPORT_SYMBOL(class_devices_in_group);
633
634 /**
635  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
636  * adjust sptlrpc settings accordingly.
637  */
638 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
639 {
640         struct obd_device  *obd;
641         const char         *type;
642         int                 i, rc = 0, rc2;
643
644         LASSERT(namelen > 0);
645
646         read_lock(&obd_dev_lock);
647         for (i = 0; i < class_devno_max(); i++) {
648                 obd = class_num2obd(i);
649
650                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
651                         continue;
652
653                 /* only notify mdc, osc, mdt, ost */
654                 type = obd->obd_type->typ_name;
655                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
656                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
657                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
658                     strcmp(type, LUSTRE_OST_NAME) != 0)
659                         continue;
660
661                 if (strncmp(obd->obd_name, fsname, namelen))
662                         continue;
663
664                 class_incref(obd, __FUNCTION__, obd);
665                 read_unlock(&obd_dev_lock);
666                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
667                                          sizeof(KEY_SPTLRPC_CONF),
668                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
669                 rc = rc ? rc : rc2;
670                 class_decref(obd, __FUNCTION__, obd);
671                 read_lock(&obd_dev_lock);
672         }
673         read_unlock(&obd_dev_lock);
674         return rc;
675 }
676 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
677
678 void obd_cleanup_caches(void)
679 {
680         ENTRY;
681         if (obd_device_cachep) {
682                 kmem_cache_destroy(obd_device_cachep);
683                 obd_device_cachep = NULL;
684         }
685         if (obdo_cachep) {
686                 kmem_cache_destroy(obdo_cachep);
687                 obdo_cachep = NULL;
688         }
689         if (import_cachep) {
690                 kmem_cache_destroy(import_cachep);
691                 import_cachep = NULL;
692         }
693         if (capa_cachep) {
694                 kmem_cache_destroy(capa_cachep);
695                 capa_cachep = NULL;
696         }
697         EXIT;
698 }
699
700 int obd_init_caches(void)
701 {
702         int rc;
703         ENTRY;
704
705         LASSERT(obd_device_cachep == NULL);
706         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
707                                               sizeof(struct obd_device),
708                                               0, 0, NULL);
709         if (!obd_device_cachep)
710                 GOTO(out, rc = -ENOMEM);
711
712         LASSERT(obdo_cachep == NULL);
713         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
714                                         0, 0, NULL);
715         if (!obdo_cachep)
716                 GOTO(out, rc = -ENOMEM);
717
718         LASSERT(import_cachep == NULL);
719         import_cachep = kmem_cache_create("ll_import_cache",
720                                           sizeof(struct obd_import),
721                                           0, 0, NULL);
722         if (!import_cachep)
723                 GOTO(out, rc = -ENOMEM);
724
725         LASSERT(capa_cachep == NULL);
726         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
727                                         0, 0, NULL);
728         if (!capa_cachep)
729                 GOTO(out, rc = -ENOMEM);
730
731         RETURN(0);
732 out:
733         obd_cleanup_caches();
734         RETURN(rc);
735 }
736
737 /* map connection to client */
738 struct obd_export *class_conn2export(struct lustre_handle *conn)
739 {
740         struct obd_export *export;
741         ENTRY;
742
743         if (!conn) {
744                 CDEBUG(D_CACHE, "looking for null handle\n");
745                 RETURN(NULL);
746         }
747
748         if (conn->cookie == -1) {  /* this means assign a new connection */
749                 CDEBUG(D_CACHE, "want a new connection\n");
750                 RETURN(NULL);
751         }
752
753         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
754         export = class_handle2object(conn->cookie, NULL);
755         RETURN(export);
756 }
757 EXPORT_SYMBOL(class_conn2export);
758
759 struct obd_device *class_exp2obd(struct obd_export *exp)
760 {
761         if (exp)
762                 return exp->exp_obd;
763         return NULL;
764 }
765 EXPORT_SYMBOL(class_exp2obd);
766
767 struct obd_device *class_conn2obd(struct lustre_handle *conn)
768 {
769         struct obd_export *export;
770         export = class_conn2export(conn);
771         if (export) {
772                 struct obd_device *obd = export->exp_obd;
773                 class_export_put(export);
774                 return obd;
775         }
776         return NULL;
777 }
778 EXPORT_SYMBOL(class_conn2obd);
779
780 struct obd_import *class_exp2cliimp(struct obd_export *exp)
781 {
782         struct obd_device *obd = exp->exp_obd;
783         if (obd == NULL)
784                 return NULL;
785         return obd->u.cli.cl_import;
786 }
787 EXPORT_SYMBOL(class_exp2cliimp);
788
789 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
790 {
791         struct obd_device *obd = class_conn2obd(conn);
792         if (obd == NULL)
793                 return NULL;
794         return obd->u.cli.cl_import;
795 }
796 EXPORT_SYMBOL(class_conn2cliimp);
797
798 /* Export management functions */
799 static void class_export_destroy(struct obd_export *exp)
800 {
801         struct obd_device *obd = exp->exp_obd;
802         ENTRY;
803
804         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
805         LASSERT(obd != NULL);
806
807         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
808                exp->exp_client_uuid.uuid, obd->obd_name);
809
810         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
811         if (exp->exp_connection)
812                 ptlrpc_put_connection_superhack(exp->exp_connection);
813
814         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
815         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
816         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
817         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
818         obd_destroy_export(exp);
819         class_decref(obd, "export", exp);
820
821         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
822         EXIT;
823 }
824
825 static void export_handle_addref(void *export)
826 {
827         class_export_get(export);
828 }
829
830 static struct portals_handle_ops export_handle_ops = {
831         .hop_addref = export_handle_addref,
832         .hop_free   = NULL,
833 };
834
835 struct obd_export *class_export_get(struct obd_export *exp)
836 {
837         atomic_inc(&exp->exp_refcount);
838         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
839                atomic_read(&exp->exp_refcount));
840         return exp;
841 }
842 EXPORT_SYMBOL(class_export_get);
843
844 void class_export_put(struct obd_export *exp)
845 {
846         LASSERT(exp != NULL);
847         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
848         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
849                atomic_read(&exp->exp_refcount) - 1);
850
851         if (atomic_dec_and_test(&exp->exp_refcount)) {
852                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
853                 CDEBUG(D_IOCTL, "final put %p/%s\n",
854                        exp, exp->exp_client_uuid.uuid);
855
856                 /* release nid stat refererence */
857                 lprocfs_exp_cleanup(exp);
858
859                 obd_zombie_export_add(exp);
860         }
861 }
862 EXPORT_SYMBOL(class_export_put);
863
864 /* Creates a new export, adds it to the hash table, and returns a
865  * pointer to it. The refcount is 2: one for the hash reference, and
866  * one for the pointer returned by this function. */
867 struct obd_export *class_new_export(struct obd_device *obd,
868                                     struct obd_uuid *cluuid)
869 {
870         struct obd_export *export;
871         cfs_hash_t *hash = NULL;
872         int rc = 0;
873         ENTRY;
874
875         OBD_ALLOC_PTR(export);
876         if (!export)
877                 return ERR_PTR(-ENOMEM);
878
879         export->exp_conn_cnt = 0;
880         export->exp_lock_hash = NULL;
881         export->exp_flock_hash = NULL;
882         atomic_set(&export->exp_refcount, 2);
883         atomic_set(&export->exp_rpc_count, 0);
884         atomic_set(&export->exp_cb_count, 0);
885         atomic_set(&export->exp_locks_count, 0);
886 #if LUSTRE_TRACKS_LOCK_EXP_REFS
887         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
888         spin_lock_init(&export->exp_locks_list_guard);
889 #endif
890         atomic_set(&export->exp_replay_count, 0);
891         export->exp_obd = obd;
892         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
893         spin_lock_init(&export->exp_uncommitted_replies_lock);
894         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
895         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
896         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
897         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
898         CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
899         class_handle_hash(&export->exp_handle, &export_handle_ops);
900         export->exp_last_request_time = cfs_time_current_sec();
901         spin_lock_init(&export->exp_lock);
902         spin_lock_init(&export->exp_rpc_lock);
903         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
904         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
905         spin_lock_init(&export->exp_bl_list_lock);
906         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
907
908         export->exp_sp_peer = LUSTRE_SP_ANY;
909         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
910         export->exp_client_uuid = *cluuid;
911         obd_init_export(export);
912
913         spin_lock(&obd->obd_dev_lock);
914         /* shouldn't happen, but might race */
915         if (obd->obd_stopping)
916                 GOTO(exit_unlock, rc = -ENODEV);
917
918         hash = cfs_hash_getref(obd->obd_uuid_hash);
919         if (hash == NULL)
920                 GOTO(exit_unlock, rc = -ENODEV);
921         spin_unlock(&obd->obd_dev_lock);
922
923         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
924                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
925                 if (rc != 0) {
926                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
927                                       obd->obd_name, cluuid->uuid, rc);
928                         GOTO(exit_err, rc = -EALREADY);
929                 }
930         }
931
932         spin_lock(&obd->obd_dev_lock);
933         if (obd->obd_stopping) {
934                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
935                 GOTO(exit_unlock, rc = -ENODEV);
936         }
937
938         class_incref(obd, "export", export);
939         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
940         cfs_list_add_tail(&export->exp_obd_chain_timed,
941                           &export->exp_obd->obd_exports_timed);
942         export->exp_obd->obd_num_exports++;
943         spin_unlock(&obd->obd_dev_lock);
944         cfs_hash_putref(hash);
945         RETURN(export);
946
947 exit_unlock:
948         spin_unlock(&obd->obd_dev_lock);
949 exit_err:
950         if (hash)
951                 cfs_hash_putref(hash);
952         class_handle_unhash(&export->exp_handle);
953         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
954         obd_destroy_export(export);
955         OBD_FREE_PTR(export);
956         return ERR_PTR(rc);
957 }
958 EXPORT_SYMBOL(class_new_export);
959
960 void class_unlink_export(struct obd_export *exp)
961 {
962         class_handle_unhash(&exp->exp_handle);
963
964         spin_lock(&exp->exp_obd->obd_dev_lock);
965         /* delete an uuid-export hashitem from hashtables */
966         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
967                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
968                              &exp->exp_client_uuid,
969                              &exp->exp_uuid_hash);
970
971         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
972         cfs_list_del_init(&exp->exp_obd_chain_timed);
973         exp->exp_obd->obd_num_exports--;
974         spin_unlock(&exp->exp_obd->obd_dev_lock);
975         class_export_put(exp);
976 }
977 EXPORT_SYMBOL(class_unlink_export);
978
979 /* Import management functions */
980 void class_import_destroy(struct obd_import *imp)
981 {
982         ENTRY;
983
984         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
985                 imp->imp_obd->obd_name);
986
987         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
988
989         ptlrpc_put_connection_superhack(imp->imp_connection);
990
991         while (!cfs_list_empty(&imp->imp_conn_list)) {
992                 struct obd_import_conn *imp_conn;
993
994                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
995                                           struct obd_import_conn, oic_item);
996                 cfs_list_del_init(&imp_conn->oic_item);
997                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
998                 OBD_FREE(imp_conn, sizeof(*imp_conn));
999         }
1000
1001         LASSERT(imp->imp_sec == NULL);
1002         class_decref(imp->imp_obd, "import", imp);
1003         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1004         EXIT;
1005 }
1006
1007 static void import_handle_addref(void *import)
1008 {
1009         class_import_get(import);
1010 }
1011
1012 static struct portals_handle_ops import_handle_ops = {
1013         .hop_addref = import_handle_addref,
1014         .hop_free   = NULL,
1015 };
1016
1017 struct obd_import *class_import_get(struct obd_import *import)
1018 {
1019         atomic_inc(&import->imp_refcount);
1020         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1021                atomic_read(&import->imp_refcount),
1022                import->imp_obd->obd_name);
1023         return import;
1024 }
1025 EXPORT_SYMBOL(class_import_get);
1026
1027 void class_import_put(struct obd_import *imp)
1028 {
1029         ENTRY;
1030
1031         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1032         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1033
1034         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1035                atomic_read(&imp->imp_refcount) - 1,
1036                imp->imp_obd->obd_name);
1037
1038         if (atomic_dec_and_test(&imp->imp_refcount)) {
1039                 CDEBUG(D_INFO, "final put import %p\n", imp);
1040                 obd_zombie_import_add(imp);
1041         }
1042
1043         /* catch possible import put race */
1044         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1045         EXIT;
1046 }
1047 EXPORT_SYMBOL(class_import_put);
1048
1049 static void init_imp_at(struct imp_at *at) {
1050         int i;
1051         at_init(&at->iat_net_latency, 0, 0);
1052         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1053                 /* max service estimates are tracked on the server side, so
1054                    don't use the AT history here, just use the last reported
1055                    val. (But keep hist for proc histogram, worst_ever) */
1056                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1057                         AT_FLG_NOHIST);
1058         }
1059 }
1060
1061 struct obd_import *class_new_import(struct obd_device *obd)
1062 {
1063         struct obd_import *imp;
1064
1065         OBD_ALLOC(imp, sizeof(*imp));
1066         if (imp == NULL)
1067                 return NULL;
1068
1069         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1070         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1071         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1072         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1073         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1074         CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1075         imp->imp_replay_cursor = &imp->imp_committed_list;
1076         spin_lock_init(&imp->imp_lock);
1077         imp->imp_last_success_conn = 0;
1078         imp->imp_state = LUSTRE_IMP_NEW;
1079         imp->imp_obd = class_incref(obd, "import", imp);
1080         mutex_init(&imp->imp_sec_mutex);
1081         init_waitqueue_head(&imp->imp_recovery_waitq);
1082
1083         atomic_set(&imp->imp_refcount, 2);
1084         atomic_set(&imp->imp_unregistering, 0);
1085         atomic_set(&imp->imp_inflight, 0);
1086         atomic_set(&imp->imp_replay_inflight, 0);
1087         atomic_set(&imp->imp_inval_count, 0);
1088         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1089         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1090         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1091         init_imp_at(&imp->imp_at);
1092
1093         /* the default magic is V2, will be used in connect RPC, and
1094          * then adjusted according to the flags in request/reply. */
1095         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1096
1097         return imp;
1098 }
1099 EXPORT_SYMBOL(class_new_import);
1100
1101 void class_destroy_import(struct obd_import *import)
1102 {
1103         LASSERT(import != NULL);
1104         LASSERT(import != LP_POISON);
1105
1106         class_handle_unhash(&import->imp_handle);
1107
1108         spin_lock(&import->imp_lock);
1109         import->imp_generation++;
1110         spin_unlock(&import->imp_lock);
1111         class_import_put(import);
1112 }
1113 EXPORT_SYMBOL(class_destroy_import);
1114
1115 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1116
1117 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1118 {
1119         spin_lock(&exp->exp_locks_list_guard);
1120
1121         LASSERT(lock->l_exp_refs_nr >= 0);
1122
1123         if (lock->l_exp_refs_target != NULL &&
1124             lock->l_exp_refs_target != exp) {
1125                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1126                               exp, lock, lock->l_exp_refs_target);
1127         }
1128         if ((lock->l_exp_refs_nr ++) == 0) {
1129                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1130                 lock->l_exp_refs_target = exp;
1131         }
1132         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1133                lock, exp, lock->l_exp_refs_nr);
1134         spin_unlock(&exp->exp_locks_list_guard);
1135 }
1136 EXPORT_SYMBOL(__class_export_add_lock_ref);
1137
1138 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1139 {
1140         spin_lock(&exp->exp_locks_list_guard);
1141         LASSERT(lock->l_exp_refs_nr > 0);
1142         if (lock->l_exp_refs_target != exp) {
1143                 LCONSOLE_WARN("lock %p, "
1144                               "mismatching export pointers: %p, %p\n",
1145                               lock, lock->l_exp_refs_target, exp);
1146         }
1147         if (-- lock->l_exp_refs_nr == 0) {
1148                 cfs_list_del_init(&lock->l_exp_refs_link);
1149                 lock->l_exp_refs_target = NULL;
1150         }
1151         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1152                lock, exp, lock->l_exp_refs_nr);
1153         spin_unlock(&exp->exp_locks_list_guard);
1154 }
1155 EXPORT_SYMBOL(__class_export_del_lock_ref);
1156 #endif
1157
1158 /* A connection defines an export context in which preallocation can
1159    be managed. This releases the export pointer reference, and returns
1160    the export handle, so the export refcount is 1 when this function
1161    returns. */
1162 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1163                   struct obd_uuid *cluuid)
1164 {
1165         struct obd_export *export;
1166         LASSERT(conn != NULL);
1167         LASSERT(obd != NULL);
1168         LASSERT(cluuid != NULL);
1169         ENTRY;
1170
1171         export = class_new_export(obd, cluuid);
1172         if (IS_ERR(export))
1173                 RETURN(PTR_ERR(export));
1174
1175         conn->cookie = export->exp_handle.h_cookie;
1176         class_export_put(export);
1177
1178         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1179                cluuid->uuid, conn->cookie);
1180         RETURN(0);
1181 }
1182 EXPORT_SYMBOL(class_connect);
1183
1184 /* if export is involved in recovery then clean up related things */
1185 void class_export_recovery_cleanup(struct obd_export *exp)
1186 {
1187         struct obd_device *obd = exp->exp_obd;
1188
1189         spin_lock(&obd->obd_recovery_task_lock);
1190         if (exp->exp_delayed)
1191                 obd->obd_delayed_clients--;
1192         if (obd->obd_recovering) {
1193                 if (exp->exp_in_recovery) {
1194                         spin_lock(&exp->exp_lock);
1195                         exp->exp_in_recovery = 0;
1196                         spin_unlock(&exp->exp_lock);
1197                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1198                         atomic_dec(&obd->obd_connected_clients);
1199                 }
1200
1201                 /* if called during recovery then should update
1202                  * obd_stale_clients counter,
1203                  * lightweight exports are not counted */
1204                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1205                         exp->exp_obd->obd_stale_clients++;
1206         }
1207         spin_unlock(&obd->obd_recovery_task_lock);
1208         /** Cleanup req replay fields */
1209         if (exp->exp_req_replay_needed) {
1210                 spin_lock(&exp->exp_lock);
1211                 exp->exp_req_replay_needed = 0;
1212                 spin_unlock(&exp->exp_lock);
1213                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1214                 atomic_dec(&obd->obd_req_replay_clients);
1215         }
1216         /** Cleanup lock replay data */
1217         if (exp->exp_lock_replay_needed) {
1218                 spin_lock(&exp->exp_lock);
1219                 exp->exp_lock_replay_needed = 0;
1220                 spin_unlock(&exp->exp_lock);
1221                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1222                 atomic_dec(&obd->obd_lock_replay_clients);
1223         }
1224 }
1225
1226 /* This function removes 1-3 references from the export:
1227  * 1 - for export pointer passed
1228  * and if disconnect really need
1229  * 2 - removing from hash
1230  * 3 - in client_unlink_export
1231  * The export pointer passed to this function can destroyed */
1232 int class_disconnect(struct obd_export *export)
1233 {
1234         int already_disconnected;
1235         ENTRY;
1236
1237         if (export == NULL) {
1238                 CWARN("attempting to free NULL export %p\n", export);
1239                 RETURN(-EINVAL);
1240         }
1241
1242         spin_lock(&export->exp_lock);
1243         already_disconnected = export->exp_disconnected;
1244         export->exp_disconnected = 1;
1245         spin_unlock(&export->exp_lock);
1246
1247         /* class_cleanup(), abort_recovery(), and class_fail_export()
1248          * all end up in here, and if any of them race we shouldn't
1249          * call extra class_export_puts(). */
1250         if (already_disconnected) {
1251                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1252                 GOTO(no_disconn, already_disconnected);
1253         }
1254
1255         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1256                export->exp_handle.h_cookie);
1257
1258         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1259                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1260                              &export->exp_connection->c_peer.nid,
1261                              &export->exp_nid_hash);
1262
1263         class_export_recovery_cleanup(export);
1264         class_unlink_export(export);
1265 no_disconn:
1266         class_export_put(export);
1267         RETURN(0);
1268 }
1269 EXPORT_SYMBOL(class_disconnect);
1270
1271 /* Return non-zero for a fully connected export */
1272 int class_connected_export(struct obd_export *exp)
1273 {
1274         int connected = 0;
1275
1276         if (exp) {
1277                 spin_lock(&exp->exp_lock);
1278                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1279                 spin_unlock(&exp->exp_lock);
1280         }
1281         return connected;
1282 }
1283 EXPORT_SYMBOL(class_connected_export);
1284
1285 static void class_disconnect_export_list(cfs_list_t *list,
1286                                          enum obd_option flags)
1287 {
1288         int rc;
1289         struct obd_export *exp;
1290         ENTRY;
1291
1292         /* It's possible that an export may disconnect itself, but
1293          * nothing else will be added to this list. */
1294         while (!cfs_list_empty(list)) {
1295                 exp = cfs_list_entry(list->next, struct obd_export,
1296                                      exp_obd_chain);
1297                 /* need for safe call CDEBUG after obd_disconnect */
1298                 class_export_get(exp);
1299
1300                 spin_lock(&exp->exp_lock);
1301                 exp->exp_flags = flags;
1302                 spin_unlock(&exp->exp_lock);
1303
1304                 if (obd_uuid_equals(&exp->exp_client_uuid,
1305                                     &exp->exp_obd->obd_uuid)) {
1306                         CDEBUG(D_HA,
1307                                "exp %p export uuid == obd uuid, don't discon\n",
1308                                exp);
1309                         /* Need to delete this now so we don't end up pointing
1310                          * to work_list later when this export is cleaned up. */
1311                         cfs_list_del_init(&exp->exp_obd_chain);
1312                         class_export_put(exp);
1313                         continue;
1314                 }
1315
1316                 class_export_get(exp);
1317                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1318                        "last request at "CFS_TIME_T"\n",
1319                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1320                        exp, exp->exp_last_request_time);
1321                 /* release one export reference anyway */
1322                 rc = obd_disconnect(exp);
1323
1324                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1325                        obd_export_nid2str(exp), exp, rc);
1326                 class_export_put(exp);
1327         }
1328         EXIT;
1329 }
1330
1331 void class_disconnect_exports(struct obd_device *obd)
1332 {
1333         cfs_list_t work_list;
1334         ENTRY;
1335
1336         /* Move all of the exports from obd_exports to a work list, en masse. */
1337         CFS_INIT_LIST_HEAD(&work_list);
1338         spin_lock(&obd->obd_dev_lock);
1339         cfs_list_splice_init(&obd->obd_exports, &work_list);
1340         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1341         spin_unlock(&obd->obd_dev_lock);
1342
1343         if (!cfs_list_empty(&work_list)) {
1344                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1345                        "disconnecting them\n", obd->obd_minor, obd);
1346                 class_disconnect_export_list(&work_list,
1347                                              exp_flags_from_obd(obd));
1348         } else
1349                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1350                        obd->obd_minor, obd);
1351         EXIT;
1352 }
1353 EXPORT_SYMBOL(class_disconnect_exports);
1354
1355 /* Remove exports that have not completed recovery.
1356  */
1357 void class_disconnect_stale_exports(struct obd_device *obd,
1358                                     int (*test_export)(struct obd_export *))
1359 {
1360         cfs_list_t work_list;
1361         struct obd_export *exp, *n;
1362         int evicted = 0;
1363         ENTRY;
1364
1365         CFS_INIT_LIST_HEAD(&work_list);
1366         spin_lock(&obd->obd_dev_lock);
1367         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1368                                      exp_obd_chain) {
1369                 /* don't count self-export as client */
1370                 if (obd_uuid_equals(&exp->exp_client_uuid,
1371                                     &exp->exp_obd->obd_uuid))
1372                         continue;
1373
1374                 /* don't evict clients which have no slot in last_rcvd
1375                  * (e.g. lightweight connection) */
1376                 if (exp->exp_target_data.ted_lr_idx == -1)
1377                         continue;
1378
1379                 spin_lock(&exp->exp_lock);
1380                 if (exp->exp_failed || test_export(exp)) {
1381                         spin_unlock(&exp->exp_lock);
1382                         continue;
1383                 }
1384                 exp->exp_failed = 1;
1385                 spin_unlock(&exp->exp_lock);
1386
1387                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1388                 evicted++;
1389                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1390                        obd->obd_name, exp->exp_client_uuid.uuid,
1391                        exp->exp_connection == NULL ? "<unknown>" :
1392                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1393                 print_export_data(exp, "EVICTING", 0);
1394         }
1395         spin_unlock(&obd->obd_dev_lock);
1396
1397         if (evicted)
1398                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1399                               obd->obd_name, evicted);
1400
1401         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1402                                                  OBD_OPT_ABORT_RECOV);
1403         EXIT;
1404 }
1405 EXPORT_SYMBOL(class_disconnect_stale_exports);
1406
1407 void class_fail_export(struct obd_export *exp)
1408 {
1409         int rc, already_failed;
1410
1411         spin_lock(&exp->exp_lock);
1412         already_failed = exp->exp_failed;
1413         exp->exp_failed = 1;
1414         spin_unlock(&exp->exp_lock);
1415
1416         if (already_failed) {
1417                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1418                        exp, exp->exp_client_uuid.uuid);
1419                 return;
1420         }
1421
1422         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1423                exp, exp->exp_client_uuid.uuid);
1424
1425         if (obd_dump_on_timeout)
1426                 libcfs_debug_dumplog();
1427
1428         /* need for safe call CDEBUG after obd_disconnect */
1429         class_export_get(exp);
1430
1431         /* Most callers into obd_disconnect are removing their own reference
1432          * (request, for example) in addition to the one from the hash table.
1433          * We don't have such a reference here, so make one. */
1434         class_export_get(exp);
1435         rc = obd_disconnect(exp);
1436         if (rc)
1437                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1438         else
1439                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1440                        exp, exp->exp_client_uuid.uuid);
1441         class_export_put(exp);
1442 }
1443 EXPORT_SYMBOL(class_fail_export);
1444
1445 char *obd_export_nid2str(struct obd_export *exp)
1446 {
1447         if (exp->exp_connection != NULL)
1448                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1449
1450         return "(no nid)";
1451 }
1452 EXPORT_SYMBOL(obd_export_nid2str);
1453
1454 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1455 {
1456         cfs_hash_t *nid_hash;
1457         struct obd_export *doomed_exp = NULL;
1458         int exports_evicted = 0;
1459
1460         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1461
1462         spin_lock(&obd->obd_dev_lock);
1463         /* umount has run already, so evict thread should leave
1464          * its task to umount thread now */
1465         if (obd->obd_stopping) {
1466                 spin_unlock(&obd->obd_dev_lock);
1467                 return exports_evicted;
1468         }
1469         nid_hash = obd->obd_nid_hash;
1470         cfs_hash_getref(nid_hash);
1471         spin_unlock(&obd->obd_dev_lock);
1472
1473         do {
1474                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1475                 if (doomed_exp == NULL)
1476                         break;
1477
1478                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1479                          "nid %s found, wanted nid %s, requested nid %s\n",
1480                          obd_export_nid2str(doomed_exp),
1481                          libcfs_nid2str(nid_key), nid);
1482                 LASSERTF(doomed_exp != obd->obd_self_export,
1483                          "self-export is hashed by NID?\n");
1484                 exports_evicted++;
1485                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1486                               "request\n", obd->obd_name,
1487                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1488                               obd_export_nid2str(doomed_exp));
1489                 class_fail_export(doomed_exp);
1490                 class_export_put(doomed_exp);
1491         } while (1);
1492
1493         cfs_hash_putref(nid_hash);
1494
1495         if (!exports_evicted)
1496                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1497                        obd->obd_name, nid);
1498         return exports_evicted;
1499 }
1500 EXPORT_SYMBOL(obd_export_evict_by_nid);
1501
1502 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1503 {
1504         cfs_hash_t *uuid_hash;
1505         struct obd_export *doomed_exp = NULL;
1506         struct obd_uuid doomed_uuid;
1507         int exports_evicted = 0;
1508
1509         spin_lock(&obd->obd_dev_lock);
1510         if (obd->obd_stopping) {
1511                 spin_unlock(&obd->obd_dev_lock);
1512                 return exports_evicted;
1513         }
1514         uuid_hash = obd->obd_uuid_hash;
1515         cfs_hash_getref(uuid_hash);
1516         spin_unlock(&obd->obd_dev_lock);
1517
1518         obd_str2uuid(&doomed_uuid, uuid);
1519         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1520                 CERROR("%s: can't evict myself\n", obd->obd_name);
1521                 cfs_hash_putref(uuid_hash);
1522                 return exports_evicted;
1523         }
1524
1525         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1526
1527         if (doomed_exp == NULL) {
1528                 CERROR("%s: can't disconnect %s: no exports found\n",
1529                        obd->obd_name, uuid);
1530         } else {
1531                 CWARN("%s: evicting %s at adminstrative request\n",
1532                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1533                 class_fail_export(doomed_exp);
1534                 class_export_put(doomed_exp);
1535                 exports_evicted++;
1536         }
1537         cfs_hash_putref(uuid_hash);
1538
1539         return exports_evicted;
1540 }
1541 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1542
1543 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1544 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1545 EXPORT_SYMBOL(class_export_dump_hook);
1546 #endif
1547
1548 static void print_export_data(struct obd_export *exp, const char *status,
1549                               int locks)
1550 {
1551         struct ptlrpc_reply_state *rs;
1552         struct ptlrpc_reply_state *first_reply = NULL;
1553         int nreplies = 0;
1554
1555         spin_lock(&exp->exp_lock);
1556         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1557                                 rs_exp_list) {
1558                 if (nreplies == 0)
1559                         first_reply = rs;
1560                 nreplies++;
1561         }
1562         spin_unlock(&exp->exp_lock);
1563
1564         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1565                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1566                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1567                atomic_read(&exp->exp_rpc_count),
1568                atomic_read(&exp->exp_cb_count),
1569                atomic_read(&exp->exp_locks_count),
1570                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1571                nreplies, first_reply, nreplies > 3 ? "..." : "",
1572                exp->exp_last_committed);
1573 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1574         if (locks && class_export_dump_hook != NULL)
1575                 class_export_dump_hook(exp);
1576 #endif
1577 }
1578
1579 void dump_exports(struct obd_device *obd, int locks)
1580 {
1581         struct obd_export *exp;
1582
1583         spin_lock(&obd->obd_dev_lock);
1584         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1585                 print_export_data(exp, "ACTIVE", locks);
1586         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1587                 print_export_data(exp, "UNLINKED", locks);
1588         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1589                 print_export_data(exp, "DELAYED", locks);
1590         spin_unlock(&obd->obd_dev_lock);
1591         spin_lock(&obd_zombie_impexp_lock);
1592         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1593                 print_export_data(exp, "ZOMBIE", locks);
1594         spin_unlock(&obd_zombie_impexp_lock);
1595 }
1596 EXPORT_SYMBOL(dump_exports);
1597
1598 void obd_exports_barrier(struct obd_device *obd)
1599 {
1600         int waited = 2;
1601         LASSERT(cfs_list_empty(&obd->obd_exports));
1602         spin_lock(&obd->obd_dev_lock);
1603         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1604                 spin_unlock(&obd->obd_dev_lock);
1605                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1606                                                    cfs_time_seconds(waited));
1607                 if (waited > 5 && IS_PO2(waited)) {
1608                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1609                                       "more than %d seconds. "
1610                                       "The obd refcount = %d. Is it stuck?\n",
1611                                       obd->obd_name, waited,
1612                                       atomic_read(&obd->obd_refcount));
1613                         dump_exports(obd, 1);
1614                 }
1615                 waited *= 2;
1616                 spin_lock(&obd->obd_dev_lock);
1617         }
1618         spin_unlock(&obd->obd_dev_lock);
1619 }
1620 EXPORT_SYMBOL(obd_exports_barrier);
1621
1622 /* Total amount of zombies to be destroyed */
1623 static int zombies_count = 0;
1624
1625 /**
1626  * kill zombie imports and exports
1627  */
1628 void obd_zombie_impexp_cull(void)
1629 {
1630         struct obd_import *import;
1631         struct obd_export *export;
1632         ENTRY;
1633
1634         do {
1635                 spin_lock(&obd_zombie_impexp_lock);
1636
1637                 import = NULL;
1638                 if (!cfs_list_empty(&obd_zombie_imports)) {
1639                         import = cfs_list_entry(obd_zombie_imports.next,
1640                                                 struct obd_import,
1641                                                 imp_zombie_chain);
1642                         cfs_list_del_init(&import->imp_zombie_chain);
1643                 }
1644
1645                 export = NULL;
1646                 if (!cfs_list_empty(&obd_zombie_exports)) {
1647                         export = cfs_list_entry(obd_zombie_exports.next,
1648                                                 struct obd_export,
1649                                                 exp_obd_chain);
1650                         cfs_list_del_init(&export->exp_obd_chain);
1651                 }
1652
1653                 spin_unlock(&obd_zombie_impexp_lock);
1654
1655                 if (import != NULL) {
1656                         class_import_destroy(import);
1657                         spin_lock(&obd_zombie_impexp_lock);
1658                         zombies_count--;
1659                         spin_unlock(&obd_zombie_impexp_lock);
1660                 }
1661
1662                 if (export != NULL) {
1663                         class_export_destroy(export);
1664                         spin_lock(&obd_zombie_impexp_lock);
1665                         zombies_count--;
1666                         spin_unlock(&obd_zombie_impexp_lock);
1667                 }
1668
1669                 cond_resched();
1670         } while (import != NULL || export != NULL);
1671         EXIT;
1672 }
1673
1674 static struct completion        obd_zombie_start;
1675 static struct completion        obd_zombie_stop;
1676 static unsigned long            obd_zombie_flags;
1677 static wait_queue_head_t        obd_zombie_waitq;
1678 static pid_t                    obd_zombie_pid;
1679
1680 enum {
1681         OBD_ZOMBIE_STOP         = 0x0001,
1682 };
1683
1684 /**
1685  * check for work for kill zombie import/export thread.
1686  */
1687 static int obd_zombie_impexp_check(void *arg)
1688 {
1689         int rc;
1690
1691         spin_lock(&obd_zombie_impexp_lock);
1692         rc = (zombies_count == 0) &&
1693              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1694         spin_unlock(&obd_zombie_impexp_lock);
1695
1696         RETURN(rc);
1697 }
1698
1699 /**
1700  * Add export to the obd_zombe thread and notify it.
1701  */
1702 static void obd_zombie_export_add(struct obd_export *exp) {
1703         spin_lock(&exp->exp_obd->obd_dev_lock);
1704         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1705         cfs_list_del_init(&exp->exp_obd_chain);
1706         spin_unlock(&exp->exp_obd->obd_dev_lock);
1707         spin_lock(&obd_zombie_impexp_lock);
1708         zombies_count++;
1709         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1710         spin_unlock(&obd_zombie_impexp_lock);
1711
1712         obd_zombie_impexp_notify();
1713 }
1714
1715 /**
1716  * Add import to the obd_zombe thread and notify it.
1717  */
1718 static void obd_zombie_import_add(struct obd_import *imp) {
1719         LASSERT(imp->imp_sec == NULL);
1720         LASSERT(imp->imp_rq_pool == NULL);
1721         spin_lock(&obd_zombie_impexp_lock);
1722         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1723         zombies_count++;
1724         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1725         spin_unlock(&obd_zombie_impexp_lock);
1726
1727         obd_zombie_impexp_notify();
1728 }
1729
1730 /**
1731  * notify import/export destroy thread about new zombie.
1732  */
1733 static void obd_zombie_impexp_notify(void)
1734 {
1735         /*
1736          * Make sure obd_zomebie_impexp_thread get this notification.
1737          * It is possible this signal only get by obd_zombie_barrier, and
1738          * barrier gulps this notification and sleeps away and hangs ensues
1739          */
1740         wake_up_all(&obd_zombie_waitq);
1741 }
1742
1743 /**
1744  * check whether obd_zombie is idle
1745  */
1746 static int obd_zombie_is_idle(void)
1747 {
1748         int rc;
1749
1750         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1751         spin_lock(&obd_zombie_impexp_lock);
1752         rc = (zombies_count == 0);
1753         spin_unlock(&obd_zombie_impexp_lock);
1754         return rc;
1755 }
1756
1757 /**
1758  * wait when obd_zombie import/export queues become empty
1759  */
1760 void obd_zombie_barrier(void)
1761 {
1762         struct l_wait_info lwi = { 0 };
1763
1764         if (obd_zombie_pid == current_pid())
1765                 /* don't wait for myself */
1766                 return;
1767         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1768 }
1769 EXPORT_SYMBOL(obd_zombie_barrier);
1770
1771 #ifdef __KERNEL__
1772
1773 /**
1774  * destroy zombie export/import thread.
1775  */
1776 static int obd_zombie_impexp_thread(void *unused)
1777 {
1778         unshare_fs_struct();
1779         complete(&obd_zombie_start);
1780
1781         obd_zombie_pid = current_pid();
1782
1783         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1784                 struct l_wait_info lwi = { 0 };
1785
1786                 l_wait_event(obd_zombie_waitq,
1787                              !obd_zombie_impexp_check(NULL), &lwi);
1788                 obd_zombie_impexp_cull();
1789
1790                 /*
1791                  * Notify obd_zombie_barrier callers that queues
1792                  * may be empty.
1793                  */
1794                 wake_up(&obd_zombie_waitq);
1795         }
1796
1797         complete(&obd_zombie_stop);
1798
1799         RETURN(0);
1800 }
1801
1802 #else /* ! KERNEL */
1803
1804 static atomic_t zombie_recur = ATOMIC_INIT(0);
1805 static void *obd_zombie_impexp_work_cb;
1806 static void *obd_zombie_impexp_idle_cb;
1807
1808 int obd_zombie_impexp_kill(void *arg)
1809 {
1810         int rc = 0;
1811
1812         if (atomic_inc_return(&zombie_recur) == 1) {
1813                 obd_zombie_impexp_cull();
1814                 rc = 1;
1815         }
1816         atomic_dec(&zombie_recur);
1817         return rc;
1818 }
1819
1820 #endif
1821
1822 /**
1823  * start destroy zombie import/export thread
1824  */
1825 int obd_zombie_impexp_init(void)
1826 {
1827 #ifdef __KERNEL__
1828         struct task_struct *task;
1829 #endif
1830
1831         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1832         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1833         spin_lock_init(&obd_zombie_impexp_lock);
1834         init_completion(&obd_zombie_start);
1835         init_completion(&obd_zombie_stop);
1836         init_waitqueue_head(&obd_zombie_waitq);
1837         obd_zombie_pid = 0;
1838
1839 #ifdef __KERNEL__
1840         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1841         if (IS_ERR(task))
1842                 RETURN(PTR_ERR(task));
1843
1844         wait_for_completion(&obd_zombie_start);
1845 #else
1846
1847         obd_zombie_impexp_work_cb =
1848                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1849                                                  &obd_zombie_impexp_kill, NULL);
1850
1851         obd_zombie_impexp_idle_cb =
1852                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1853                                                  &obd_zombie_impexp_check, NULL);
1854 #endif
1855         RETURN(0);
1856 }
1857 /**
1858  * stop destroy zombie import/export thread
1859  */
1860 void obd_zombie_impexp_stop(void)
1861 {
1862         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1863         obd_zombie_impexp_notify();
1864 #ifdef __KERNEL__
1865         wait_for_completion(&obd_zombie_stop);
1866 #else
1867         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1868         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1869 #endif
1870 }
1871
1872 /***** Kernel-userspace comm helpers *******/
1873
1874 /* Get length of entire message, including header */
1875 int kuc_len(int payload_len)
1876 {
1877         return sizeof(struct kuc_hdr) + payload_len;
1878 }
1879 EXPORT_SYMBOL(kuc_len);
1880
1881 /* Get a pointer to kuc header, given a ptr to the payload
1882  * @param p Pointer to payload area
1883  * @returns Pointer to kuc header
1884  */
1885 struct kuc_hdr * kuc_ptr(void *p)
1886 {
1887         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1888         LASSERT(lh->kuc_magic == KUC_MAGIC);
1889         return lh;
1890 }
1891 EXPORT_SYMBOL(kuc_ptr);
1892
1893 /* Test if payload is part of kuc message
1894  * @param p Pointer to payload area
1895  * @returns boolean
1896  */
1897 int kuc_ispayload(void *p)
1898 {
1899         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1900
1901         if (kh->kuc_magic == KUC_MAGIC)
1902                 return 1;
1903         else
1904                 return 0;
1905 }
1906 EXPORT_SYMBOL(kuc_ispayload);
1907
1908 /* Alloc space for a message, and fill in header
1909  * @return Pointer to payload area
1910  */
1911 void *kuc_alloc(int payload_len, int transport, int type)
1912 {
1913         struct kuc_hdr *lh;
1914         int len = kuc_len(payload_len);
1915
1916         OBD_ALLOC(lh, len);
1917         if (lh == NULL)
1918                 return ERR_PTR(-ENOMEM);
1919
1920         lh->kuc_magic = KUC_MAGIC;
1921         lh->kuc_transport = transport;
1922         lh->kuc_msgtype = type;
1923         lh->kuc_msglen = len;
1924
1925         return (void *)(lh + 1);
1926 }
1927 EXPORT_SYMBOL(kuc_alloc);
1928
1929 /* Takes pointer to payload area */
1930 inline void kuc_free(void *p, int payload_len)
1931 {
1932         struct kuc_hdr *lh = kuc_ptr(p);
1933         OBD_FREE(lh, kuc_len(payload_len));
1934 }
1935 EXPORT_SYMBOL(kuc_free);
1936
1937
1938