Whamcloud - gitweb
LU-2467 ptlrpc: Allow OBD_PINGs to be suppressed
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!cfs_request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 cfs_try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151 EXPORT_SYMBOL(class_get_type);
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         cfs_module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161 EXPORT_SYMBOL(class_put_type);
162
163 #define CLASS_MAX_NAME 1024
164
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166                         struct lprocfs_vars *vars, const char *name,
167                         struct lu_device_type *ldt)
168 {
169         struct obd_type *type;
170         int rc = 0;
171         ENTRY;
172
173         /* sanity check */
174         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
175
176         if (class_search_type(name)) {
177                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
178                 RETURN(-EEXIST);
179         }
180
181         rc = -ENOMEM;
182         OBD_ALLOC(type, sizeof(*type));
183         if (type == NULL)
184                 RETURN(rc);
185
186         OBD_ALLOC_PTR(type->typ_dt_ops);
187         OBD_ALLOC_PTR(type->typ_md_ops);
188         OBD_ALLOC(type->typ_name, strlen(name) + 1);
189
190         if (type->typ_dt_ops == NULL ||
191             type->typ_md_ops == NULL ||
192             type->typ_name == NULL)
193                 GOTO (failed, rc);
194
195         *(type->typ_dt_ops) = *dt_ops;
196         /* md_ops is optional */
197         if (md_ops)
198                 *(type->typ_md_ops) = *md_ops;
199         strcpy(type->typ_name, name);
200         spin_lock_init(&type->obd_type_lock);
201
202 #ifdef LPROCFS
203         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
204                                               vars, type);
205         if (IS_ERR(type->typ_procroot)) {
206                 rc = PTR_ERR(type->typ_procroot);
207                 type->typ_procroot = NULL;
208                 GOTO (failed, rc);
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         cfs_list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224  failed:
225         if (type->typ_name != NULL)
226                 OBD_FREE(type->typ_name, strlen(name) + 1);
227         if (type->typ_md_ops != NULL)
228                 OBD_FREE_PTR(type->typ_md_ops);
229         if (type->typ_dt_ops != NULL)
230                 OBD_FREE_PTR(type->typ_dt_ops);
231         OBD_FREE(type, sizeof(*type));
232         RETURN(rc);
233 }
234 EXPORT_SYMBOL(class_register_type);
235
236 int class_unregister_type(const char *name)
237 {
238         struct obd_type *type = class_search_type(name);
239         ENTRY;
240
241         if (!type) {
242                 CERROR("unknown obd type\n");
243                 RETURN(-EINVAL);
244         }
245
246         if (type->typ_refcnt) {
247                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248                 /* This is a bad situation, let's make the best of it */
249                 /* Remove ops, but leave the name for debugging */
250                 OBD_FREE_PTR(type->typ_dt_ops);
251                 OBD_FREE_PTR(type->typ_md_ops);
252                 RETURN(-EBUSY);
253         }
254
255         /* we do not use type->typ_procroot as for compatibility purposes
256          * other modules can share names (i.e. lod can use lov entry). so
257          * we can't reference pointer as it can get invalided when another
258          * module removes the entry */
259         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
260
261         if (type->typ_lu)
262                 lu_device_type_fini(type->typ_lu);
263
264         spin_lock(&obd_types_lock);
265         cfs_list_del(&type->typ_chain);
266         spin_unlock(&obd_types_lock);
267         OBD_FREE(type->typ_name, strlen(name) + 1);
268         if (type->typ_dt_ops != NULL)
269                 OBD_FREE_PTR(type->typ_dt_ops);
270         if (type->typ_md_ops != NULL)
271                 OBD_FREE_PTR(type->typ_md_ops);
272         OBD_FREE(type, sizeof(*type));
273         RETURN(0);
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
276
277 /**
278  * Create a new obd device.
279  *
280  * Find an empty slot in ::obd_devs[], create a new obd device in it.
281  *
282  * \param[in] type_name obd device type string.
283  * \param[in] name      obd device name.
284  *
285  * \retval NULL if create fails, otherwise return the obd device
286  *         pointer created.
287  */
288 struct obd_device *class_newdev(const char *type_name, const char *name)
289 {
290         struct obd_device *result = NULL;
291         struct obd_device *newdev;
292         struct obd_type *type = NULL;
293         int i;
294         int new_obd_minor = 0;
295         ENTRY;
296
297         if (strlen(name) >= MAX_OBD_NAME) {
298                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299                 RETURN(ERR_PTR(-EINVAL));
300         }
301
302         type = class_get_type(type_name);
303         if (type == NULL){
304                 CERROR("OBD: unknown type: %s\n", type_name);
305                 RETURN(ERR_PTR(-ENODEV));
306         }
307
308         newdev = obd_device_alloc();
309         if (newdev == NULL)
310                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
311
312         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
313
314         write_lock(&obd_dev_lock);
315         for (i = 0; i < class_devno_max(); i++) {
316                 struct obd_device *obd = class_num2obd(i);
317
318                 if (obd && obd->obd_name &&
319                     (strcmp(name, obd->obd_name) == 0)) {
320                         CERROR("Device %s already exists at %d, won't add\n",
321                                name, i);
322                         if (result) {
323                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
324                                          "%p obd_magic %08x != %08x\n", result,
325                                          result->obd_magic, OBD_DEVICE_MAGIC);
326                                 LASSERTF(result->obd_minor == new_obd_minor,
327                                          "%p obd_minor %d != %d\n", result,
328                                          result->obd_minor, new_obd_minor);
329
330                                 obd_devs[result->obd_minor] = NULL;
331                                 result->obd_name[0]='\0';
332                          }
333                         result = ERR_PTR(-EEXIST);
334                         break;
335                 }
336                 if (!result && !obd) {
337                         result = newdev;
338                         result->obd_minor = i;
339                         new_obd_minor = i;
340                         result->obd_type = type;
341                         strncpy(result->obd_name, name,
342                                 sizeof(result->obd_name) - 1);
343                         obd_devs[i] = result;
344                 }
345         }
346         write_unlock(&obd_dev_lock);
347
348         if (result == NULL && i >= class_devno_max()) {
349                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
350                        class_devno_max());
351                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
352         }
353
354         if (IS_ERR(result))
355                 GOTO(out, result);
356
357         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
358                result->obd_name, result);
359
360         RETURN(result);
361 out:
362         obd_device_free(newdev);
363 out_type:
364         class_put_type(type);
365         return result;
366 }
367
368 void class_release_dev(struct obd_device *obd)
369 {
370         struct obd_type *obd_type = obd->obd_type;
371
372         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
373                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
374         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
375                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
376         LASSERT(obd_type != NULL);
377
378         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
379                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
380
381         write_lock(&obd_dev_lock);
382         obd_devs[obd->obd_minor] = NULL;
383         write_unlock(&obd_dev_lock);
384         obd_device_free(obd);
385
386         class_put_type(obd_type);
387 }
388
389 int class_name2dev(const char *name)
390 {
391         int i;
392
393         if (!name)
394                 return -1;
395
396         read_lock(&obd_dev_lock);
397         for (i = 0; i < class_devno_max(); i++) {
398                 struct obd_device *obd = class_num2obd(i);
399
400                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
401                         /* Make sure we finished attaching before we give
402                            out any references */
403                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
404                         if (obd->obd_attached) {
405                                 read_unlock(&obd_dev_lock);
406                                 return i;
407                         }
408                         break;
409                 }
410         }
411         read_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415 EXPORT_SYMBOL(class_name2dev);
416
417 struct obd_device *class_name2obd(const char *name)
418 {
419         int dev = class_name2dev(name);
420
421         if (dev < 0 || dev > class_devno_max())
422                 return NULL;
423         return class_num2obd(dev);
424 }
425 EXPORT_SYMBOL(class_name2obd);
426
427 int class_uuid2dev(struct obd_uuid *uuid)
428 {
429         int i;
430
431         read_lock(&obd_dev_lock);
432         for (i = 0; i < class_devno_max(); i++) {
433                 struct obd_device *obd = class_num2obd(i);
434
435                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
436                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
437                         read_unlock(&obd_dev_lock);
438                         return i;
439                 }
440         }
441         read_unlock(&obd_dev_lock);
442
443         return -1;
444 }
445 EXPORT_SYMBOL(class_uuid2dev);
446
447 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
448 {
449         int dev = class_uuid2dev(uuid);
450         if (dev < 0)
451                 return NULL;
452         return class_num2obd(dev);
453 }
454 EXPORT_SYMBOL(class_uuid2obd);
455
456 /**
457  * Get obd device from ::obd_devs[]
458  *
459  * \param num [in] array index
460  *
461  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
462  *         otherwise return the obd device there.
463  */
464 struct obd_device *class_num2obd(int num)
465 {
466         struct obd_device *obd = NULL;
467
468         if (num < class_devno_max()) {
469                 obd = obd_devs[num];
470                 if (obd == NULL)
471                         return NULL;
472
473                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
474                          "%p obd_magic %08x != %08x\n",
475                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
476                 LASSERTF(obd->obd_minor == num,
477                          "%p obd_minor %0d != %0d\n",
478                          obd, obd->obd_minor, num);
479         }
480
481         return obd;
482 }
483 EXPORT_SYMBOL(class_num2obd);
484
485 /**
486  * Get obd devices count. Device in any
487  *    state are counted
488  * \retval obd device count
489  */
490 int get_devices_count(void)
491 {
492         int index, max_index = class_devno_max(), dev_count = 0;
493
494         read_lock(&obd_dev_lock);
495         for (index = 0; index <= max_index; index++) {
496                 struct obd_device *obd = class_num2obd(index);
497                 if (obd != NULL)
498                         dev_count++;
499         }
500         read_unlock(&obd_dev_lock);
501
502         return dev_count;
503 }
504 EXPORT_SYMBOL(get_devices_count);
505
506 void class_obd_list(void)
507 {
508         char *status;
509         int i;
510
511         read_lock(&obd_dev_lock);
512         for (i = 0; i < class_devno_max(); i++) {
513                 struct obd_device *obd = class_num2obd(i);
514
515                 if (obd == NULL)
516                         continue;
517                 if (obd->obd_stopping)
518                         status = "ST";
519                 else if (obd->obd_set_up)
520                         status = "UP";
521                 else if (obd->obd_attached)
522                         status = "AT";
523                 else
524                         status = "--";
525                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
526                          i, status, obd->obd_type->typ_name,
527                          obd->obd_name, obd->obd_uuid.uuid,
528                          cfs_atomic_read(&obd->obd_refcount));
529         }
530         read_unlock(&obd_dev_lock);
531         return;
532 }
533
534 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
535    specified, then only the client with that uuid is returned,
536    otherwise any client connected to the tgt is returned. */
537 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
538                                           const char * typ_name,
539                                           struct obd_uuid *grp_uuid)
540 {
541         int i;
542
543         read_lock(&obd_dev_lock);
544         for (i = 0; i < class_devno_max(); i++) {
545                 struct obd_device *obd = class_num2obd(i);
546
547                 if (obd == NULL)
548                         continue;
549                 if ((strncmp(obd->obd_type->typ_name, typ_name,
550                              strlen(typ_name)) == 0)) {
551                         if (obd_uuid_equals(tgt_uuid,
552                                             &obd->u.cli.cl_target_uuid) &&
553                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
554                                                          &obd->obd_uuid) : 1)) {
555                                 read_unlock(&obd_dev_lock);
556                                 return obd;
557                         }
558                 }
559         }
560         read_unlock(&obd_dev_lock);
561
562         return NULL;
563 }
564 EXPORT_SYMBOL(class_find_client_obd);
565
566 /* Iterate the obd_device list looking devices have grp_uuid. Start
567    searching at *next, and if a device is found, the next index to look
568    at is saved in *next. If next is NULL, then the first matching device
569    will always be returned. */
570 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
571 {
572         int i;
573
574         if (next == NULL)
575                 i = 0;
576         else if (*next >= 0 && *next < class_devno_max())
577                 i = *next;
578         else
579                 return NULL;
580
581         read_lock(&obd_dev_lock);
582         for (; i < class_devno_max(); i++) {
583                 struct obd_device *obd = class_num2obd(i);
584
585                 if (obd == NULL)
586                         continue;
587                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
588                         if (next != NULL)
589                                 *next = i+1;
590                         read_unlock(&obd_dev_lock);
591                         return obd;
592                 }
593         }
594         read_unlock(&obd_dev_lock);
595
596         return NULL;
597 }
598 EXPORT_SYMBOL(class_devices_in_group);
599
600 /**
601  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
602  * adjust sptlrpc settings accordingly.
603  */
604 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
605 {
606         struct obd_device  *obd;
607         const char         *type;
608         int                 i, rc = 0, rc2;
609
610         LASSERT(namelen > 0);
611
612         read_lock(&obd_dev_lock);
613         for (i = 0; i < class_devno_max(); i++) {
614                 obd = class_num2obd(i);
615
616                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
617                         continue;
618
619                 /* only notify mdc, osc, mdt, ost */
620                 type = obd->obd_type->typ_name;
621                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
622                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
623                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
624                     strcmp(type, LUSTRE_OST_NAME) != 0)
625                         continue;
626
627                 if (strncmp(obd->obd_name, fsname, namelen))
628                         continue;
629
630                 class_incref(obd, __FUNCTION__, obd);
631                 read_unlock(&obd_dev_lock);
632                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
633                                          sizeof(KEY_SPTLRPC_CONF),
634                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
635                 rc = rc ? rc : rc2;
636                 class_decref(obd, __FUNCTION__, obd);
637                 read_lock(&obd_dev_lock);
638         }
639         read_unlock(&obd_dev_lock);
640         return rc;
641 }
642 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
643
644 void obd_cleanup_caches(void)
645 {
646         int rc;
647
648         ENTRY;
649         if (obd_device_cachep) {
650                 rc = cfs_mem_cache_destroy(obd_device_cachep);
651                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
652                 obd_device_cachep = NULL;
653         }
654         if (obdo_cachep) {
655                 rc = cfs_mem_cache_destroy(obdo_cachep);
656                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
657                 obdo_cachep = NULL;
658         }
659         if (import_cachep) {
660                 rc = cfs_mem_cache_destroy(import_cachep);
661                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
662                 import_cachep = NULL;
663         }
664         if (capa_cachep) {
665                 rc = cfs_mem_cache_destroy(capa_cachep);
666                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
667                 capa_cachep = NULL;
668         }
669         EXIT;
670 }
671
672 int obd_init_caches(void)
673 {
674         ENTRY;
675
676         LASSERT(obd_device_cachep == NULL);
677         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
678                                                  sizeof(struct obd_device),
679                                                  0, 0);
680         if (!obd_device_cachep)
681                 GOTO(out, -ENOMEM);
682
683         LASSERT(obdo_cachep == NULL);
684         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
685                                            0, 0);
686         if (!obdo_cachep)
687                 GOTO(out, -ENOMEM);
688
689         LASSERT(import_cachep == NULL);
690         import_cachep = cfs_mem_cache_create("ll_import_cache",
691                                              sizeof(struct obd_import),
692                                              0, 0);
693         if (!import_cachep)
694                 GOTO(out, -ENOMEM);
695
696         LASSERT(capa_cachep == NULL);
697         capa_cachep = cfs_mem_cache_create("capa_cache",
698                                            sizeof(struct obd_capa), 0, 0);
699         if (!capa_cachep)
700                 GOTO(out, -ENOMEM);
701
702         RETURN(0);
703  out:
704         obd_cleanup_caches();
705         RETURN(-ENOMEM);
706
707 }
708
709 /* map connection to client */
710 struct obd_export *class_conn2export(struct lustre_handle *conn)
711 {
712         struct obd_export *export;
713         ENTRY;
714
715         if (!conn) {
716                 CDEBUG(D_CACHE, "looking for null handle\n");
717                 RETURN(NULL);
718         }
719
720         if (conn->cookie == -1) {  /* this means assign a new connection */
721                 CDEBUG(D_CACHE, "want a new connection\n");
722                 RETURN(NULL);
723         }
724
725         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
726         export = class_handle2object(conn->cookie);
727         RETURN(export);
728 }
729 EXPORT_SYMBOL(class_conn2export);
730
731 struct obd_device *class_exp2obd(struct obd_export *exp)
732 {
733         if (exp)
734                 return exp->exp_obd;
735         return NULL;
736 }
737 EXPORT_SYMBOL(class_exp2obd);
738
739 struct obd_device *class_conn2obd(struct lustre_handle *conn)
740 {
741         struct obd_export *export;
742         export = class_conn2export(conn);
743         if (export) {
744                 struct obd_device *obd = export->exp_obd;
745                 class_export_put(export);
746                 return obd;
747         }
748         return NULL;
749 }
750 EXPORT_SYMBOL(class_conn2obd);
751
752 struct obd_import *class_exp2cliimp(struct obd_export *exp)
753 {
754         struct obd_device *obd = exp->exp_obd;
755         if (obd == NULL)
756                 return NULL;
757         return obd->u.cli.cl_import;
758 }
759 EXPORT_SYMBOL(class_exp2cliimp);
760
761 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
762 {
763         struct obd_device *obd = class_conn2obd(conn);
764         if (obd == NULL)
765                 return NULL;
766         return obd->u.cli.cl_import;
767 }
768 EXPORT_SYMBOL(class_conn2cliimp);
769
770 /* Export management functions */
771 static void class_export_destroy(struct obd_export *exp)
772 {
773         struct obd_device *obd = exp->exp_obd;
774         ENTRY;
775
776         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
777         LASSERT(obd != NULL);
778
779         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
780                exp->exp_client_uuid.uuid, obd->obd_name);
781
782         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783         if (exp->exp_connection)
784                 ptlrpc_put_connection_superhack(exp->exp_connection);
785
786         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
787         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
788         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
789         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
790         obd_destroy_export(exp);
791         class_decref(obd, "export", exp);
792
793         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
794         EXIT;
795 }
796
797 static void export_handle_addref(void *export)
798 {
799         class_export_get(export);
800 }
801
802 static struct portals_handle_ops export_handle_ops = {
803         .hop_addref = export_handle_addref,
804         .hop_free   = NULL,
805 };
806
807 struct obd_export *class_export_get(struct obd_export *exp)
808 {
809         cfs_atomic_inc(&exp->exp_refcount);
810         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
811                cfs_atomic_read(&exp->exp_refcount));
812         return exp;
813 }
814 EXPORT_SYMBOL(class_export_get);
815
816 void class_export_put(struct obd_export *exp)
817 {
818         LASSERT(exp != NULL);
819         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
820         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
821                cfs_atomic_read(&exp->exp_refcount) - 1);
822
823         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
824                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
825                 CDEBUG(D_IOCTL, "final put %p/%s\n",
826                        exp, exp->exp_client_uuid.uuid);
827
828                 /* release nid stat refererence */
829                 lprocfs_exp_cleanup(exp);
830
831                 obd_zombie_export_add(exp);
832         }
833 }
834 EXPORT_SYMBOL(class_export_put);
835
836 /* Creates a new export, adds it to the hash table, and returns a
837  * pointer to it. The refcount is 2: one for the hash reference, and
838  * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840                                     struct obd_uuid *cluuid)
841 {
842         struct obd_export *export;
843         cfs_hash_t *hash = NULL;
844         int rc = 0;
845         ENTRY;
846
847         OBD_ALLOC_PTR(export);
848         if (!export)
849                 return ERR_PTR(-ENOMEM);
850
851         export->exp_conn_cnt = 0;
852         export->exp_lock_hash = NULL;
853         export->exp_flock_hash = NULL;
854         cfs_atomic_set(&export->exp_refcount, 2);
855         cfs_atomic_set(&export->exp_rpc_count, 0);
856         cfs_atomic_set(&export->exp_cb_count, 0);
857         cfs_atomic_set(&export->exp_locks_count, 0);
858 #if LUSTRE_TRACKS_LOCK_EXP_REFS
859         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
860         spin_lock_init(&export->exp_locks_list_guard);
861 #endif
862         cfs_atomic_set(&export->exp_replay_count, 0);
863         export->exp_obd = obd;
864         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
865         spin_lock_init(&export->exp_uncommitted_replies_lock);
866         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
867         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
868         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
869         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
870         class_handle_hash(&export->exp_handle, &export_handle_ops);
871         export->exp_last_request_time = cfs_time_current_sec();
872         spin_lock_init(&export->exp_lock);
873         spin_lock_init(&export->exp_rpc_lock);
874         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
875         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
876         spin_lock_init(&export->exp_bl_list_lock);
877         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
878
879         export->exp_sp_peer = LUSTRE_SP_ANY;
880         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881         export->exp_client_uuid = *cluuid;
882         obd_init_export(export);
883
884         spin_lock(&obd->obd_dev_lock);
885         /* shouldn't happen, but might race */
886         if (obd->obd_stopping)
887                 GOTO(exit_unlock, rc = -ENODEV);
888
889         hash = cfs_hash_getref(obd->obd_uuid_hash);
890         if (hash == NULL)
891                 GOTO(exit_unlock, rc = -ENODEV);
892         spin_unlock(&obd->obd_dev_lock);
893
894         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896                 if (rc != 0) {
897                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898                                       obd->obd_name, cluuid->uuid, rc);
899                         GOTO(exit_err, rc = -EALREADY);
900                 }
901         }
902
903         spin_lock(&obd->obd_dev_lock);
904         if (obd->obd_stopping) {
905                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
906                 GOTO(exit_unlock, rc = -ENODEV);
907         }
908
909         class_incref(obd, "export", export);
910         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
911         cfs_list_add_tail(&export->exp_obd_chain_timed,
912                           &export->exp_obd->obd_exports_timed);
913         export->exp_obd->obd_num_exports++;
914         spin_unlock(&obd->obd_dev_lock);
915         cfs_hash_putref(hash);
916         RETURN(export);
917
918 exit_unlock:
919         spin_unlock(&obd->obd_dev_lock);
920 exit_err:
921         if (hash)
922                 cfs_hash_putref(hash);
923         class_handle_unhash(&export->exp_handle);
924         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
925         obd_destroy_export(export);
926         OBD_FREE_PTR(export);
927         return ERR_PTR(rc);
928 }
929 EXPORT_SYMBOL(class_new_export);
930
931 void class_unlink_export(struct obd_export *exp)
932 {
933         class_handle_unhash(&exp->exp_handle);
934
935         spin_lock(&exp->exp_obd->obd_dev_lock);
936         /* delete an uuid-export hashitem from hashtables */
937         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
938                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
939                              &exp->exp_client_uuid,
940                              &exp->exp_uuid_hash);
941
942         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
943         cfs_list_del_init(&exp->exp_obd_chain_timed);
944         exp->exp_obd->obd_num_exports--;
945         spin_unlock(&exp->exp_obd->obd_dev_lock);
946         class_export_put(exp);
947 }
948 EXPORT_SYMBOL(class_unlink_export);
949
950 /* Import management functions */
951 void class_import_destroy(struct obd_import *imp)
952 {
953         ENTRY;
954
955         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
956                 imp->imp_obd->obd_name);
957
958         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
959
960         ptlrpc_put_connection_superhack(imp->imp_connection);
961
962         while (!cfs_list_empty(&imp->imp_conn_list)) {
963                 struct obd_import_conn *imp_conn;
964
965                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
966                                           struct obd_import_conn, oic_item);
967                 cfs_list_del_init(&imp_conn->oic_item);
968                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
969                 OBD_FREE(imp_conn, sizeof(*imp_conn));
970         }
971
972         LASSERT(imp->imp_sec == NULL);
973         class_decref(imp->imp_obd, "import", imp);
974         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
975         EXIT;
976 }
977
978 static void import_handle_addref(void *import)
979 {
980         class_import_get(import);
981 }
982
983 static struct portals_handle_ops import_handle_ops = {
984         .hop_addref = import_handle_addref,
985         .hop_free   = NULL,
986 };
987
988 struct obd_import *class_import_get(struct obd_import *import)
989 {
990         cfs_atomic_inc(&import->imp_refcount);
991         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
992                cfs_atomic_read(&import->imp_refcount),
993                import->imp_obd->obd_name);
994         return import;
995 }
996 EXPORT_SYMBOL(class_import_get);
997
998 void class_import_put(struct obd_import *imp)
999 {
1000         ENTRY;
1001
1002         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1003         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1004
1005         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1006                cfs_atomic_read(&imp->imp_refcount) - 1,
1007                imp->imp_obd->obd_name);
1008
1009         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1010                 CDEBUG(D_INFO, "final put import %p\n", imp);
1011                 obd_zombie_import_add(imp);
1012         }
1013
1014         /* catch possible import put race */
1015         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1016         EXIT;
1017 }
1018 EXPORT_SYMBOL(class_import_put);
1019
1020 static void init_imp_at(struct imp_at *at) {
1021         int i;
1022         at_init(&at->iat_net_latency, 0, 0);
1023         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1024                 /* max service estimates are tracked on the server side, so
1025                    don't use the AT history here, just use the last reported
1026                    val. (But keep hist for proc histogram, worst_ever) */
1027                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1028                         AT_FLG_NOHIST);
1029         }
1030 }
1031
1032 struct obd_import *class_new_import(struct obd_device *obd)
1033 {
1034         struct obd_import *imp;
1035
1036         OBD_ALLOC(imp, sizeof(*imp));
1037         if (imp == NULL)
1038                 return NULL;
1039
1040         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1041         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1042         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1043         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1044         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1045         spin_lock_init(&imp->imp_lock);
1046         imp->imp_last_success_conn = 0;
1047         imp->imp_state = LUSTRE_IMP_NEW;
1048         imp->imp_obd = class_incref(obd, "import", imp);
1049         mutex_init(&imp->imp_sec_mutex);
1050         cfs_waitq_init(&imp->imp_recovery_waitq);
1051
1052         cfs_atomic_set(&imp->imp_refcount, 2);
1053         cfs_atomic_set(&imp->imp_unregistering, 0);
1054         cfs_atomic_set(&imp->imp_inflight, 0);
1055         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1056         cfs_atomic_set(&imp->imp_inval_count, 0);
1057         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1058         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1059         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1060         init_imp_at(&imp->imp_at);
1061
1062         /* the default magic is V2, will be used in connect RPC, and
1063          * then adjusted according to the flags in request/reply. */
1064         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1065
1066         return imp;
1067 }
1068 EXPORT_SYMBOL(class_new_import);
1069
1070 void class_destroy_import(struct obd_import *import)
1071 {
1072         LASSERT(import != NULL);
1073         LASSERT(import != LP_POISON);
1074
1075         class_handle_unhash(&import->imp_handle);
1076
1077         spin_lock(&import->imp_lock);
1078         import->imp_generation++;
1079         spin_unlock(&import->imp_lock);
1080         class_import_put(import);
1081 }
1082 EXPORT_SYMBOL(class_destroy_import);
1083
1084 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1085
1086 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1087 {
1088         spin_lock(&exp->exp_locks_list_guard);
1089
1090         LASSERT(lock->l_exp_refs_nr >= 0);
1091
1092         if (lock->l_exp_refs_target != NULL &&
1093             lock->l_exp_refs_target != exp) {
1094                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1095                               exp, lock, lock->l_exp_refs_target);
1096         }
1097         if ((lock->l_exp_refs_nr ++) == 0) {
1098                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1099                 lock->l_exp_refs_target = exp;
1100         }
1101         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1102                lock, exp, lock->l_exp_refs_nr);
1103         spin_unlock(&exp->exp_locks_list_guard);
1104 }
1105 EXPORT_SYMBOL(__class_export_add_lock_ref);
1106
1107 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1108 {
1109         spin_lock(&exp->exp_locks_list_guard);
1110         LASSERT(lock->l_exp_refs_nr > 0);
1111         if (lock->l_exp_refs_target != exp) {
1112                 LCONSOLE_WARN("lock %p, "
1113                               "mismatching export pointers: %p, %p\n",
1114                               lock, lock->l_exp_refs_target, exp);
1115         }
1116         if (-- lock->l_exp_refs_nr == 0) {
1117                 cfs_list_del_init(&lock->l_exp_refs_link);
1118                 lock->l_exp_refs_target = NULL;
1119         }
1120         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1121                lock, exp, lock->l_exp_refs_nr);
1122         spin_unlock(&exp->exp_locks_list_guard);
1123 }
1124 EXPORT_SYMBOL(__class_export_del_lock_ref);
1125 #endif
1126
1127 /* A connection defines an export context in which preallocation can
1128    be managed. This releases the export pointer reference, and returns
1129    the export handle, so the export refcount is 1 when this function
1130    returns. */
1131 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1132                   struct obd_uuid *cluuid)
1133 {
1134         struct obd_export *export;
1135         LASSERT(conn != NULL);
1136         LASSERT(obd != NULL);
1137         LASSERT(cluuid != NULL);
1138         ENTRY;
1139
1140         export = class_new_export(obd, cluuid);
1141         if (IS_ERR(export))
1142                 RETURN(PTR_ERR(export));
1143
1144         conn->cookie = export->exp_handle.h_cookie;
1145         class_export_put(export);
1146
1147         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1148                cluuid->uuid, conn->cookie);
1149         RETURN(0);
1150 }
1151 EXPORT_SYMBOL(class_connect);
1152
1153 /* if export is involved in recovery then clean up related things */
1154 void class_export_recovery_cleanup(struct obd_export *exp)
1155 {
1156         struct obd_device *obd = exp->exp_obd;
1157
1158         spin_lock(&obd->obd_recovery_task_lock);
1159         if (exp->exp_delayed)
1160                 obd->obd_delayed_clients--;
1161         if (obd->obd_recovering) {
1162                 if (exp->exp_in_recovery) {
1163                         spin_lock(&exp->exp_lock);
1164                         exp->exp_in_recovery = 0;
1165                         spin_unlock(&exp->exp_lock);
1166                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1167                         cfs_atomic_dec(&obd->obd_connected_clients);
1168                 }
1169
1170                 /* if called during recovery then should update
1171                  * obd_stale_clients counter,
1172                  * lightweight exports are not counted */
1173                 if (exp->exp_failed &&
1174                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1175                         exp->exp_obd->obd_stale_clients++;
1176         }
1177         spin_unlock(&obd->obd_recovery_task_lock);
1178         /** Cleanup req replay fields */
1179         if (exp->exp_req_replay_needed) {
1180                 spin_lock(&exp->exp_lock);
1181                 exp->exp_req_replay_needed = 0;
1182                 spin_unlock(&exp->exp_lock);
1183                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1184                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1185         }
1186         /** Cleanup lock replay data */
1187         if (exp->exp_lock_replay_needed) {
1188                 spin_lock(&exp->exp_lock);
1189                 exp->exp_lock_replay_needed = 0;
1190                 spin_unlock(&exp->exp_lock);
1191                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1192                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1193         }
1194 }
1195
1196 /* This function removes 1-3 references from the export:
1197  * 1 - for export pointer passed
1198  * and if disconnect really need
1199  * 2 - removing from hash
1200  * 3 - in client_unlink_export
1201  * The export pointer passed to this function can destroyed */
1202 int class_disconnect(struct obd_export *export)
1203 {
1204         int already_disconnected;
1205         ENTRY;
1206
1207         if (export == NULL) {
1208                 CWARN("attempting to free NULL export %p\n", export);
1209                 RETURN(-EINVAL);
1210         }
1211
1212         spin_lock(&export->exp_lock);
1213         already_disconnected = export->exp_disconnected;
1214         export->exp_disconnected = 1;
1215         spin_unlock(&export->exp_lock);
1216
1217         /* class_cleanup(), abort_recovery(), and class_fail_export()
1218          * all end up in here, and if any of them race we shouldn't
1219          * call extra class_export_puts(). */
1220         if (already_disconnected) {
1221                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1222                 GOTO(no_disconn, already_disconnected);
1223         }
1224
1225         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1226                export->exp_handle.h_cookie);
1227
1228         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1229                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1230                              &export->exp_connection->c_peer.nid,
1231                              &export->exp_nid_hash);
1232
1233         class_export_recovery_cleanup(export);
1234         class_unlink_export(export);
1235 no_disconn:
1236         class_export_put(export);
1237         RETURN(0);
1238 }
1239 EXPORT_SYMBOL(class_disconnect);
1240
1241 /* Return non-zero for a fully connected export */
1242 int class_connected_export(struct obd_export *exp)
1243 {
1244         if (exp) {
1245                 int connected;
1246                 spin_lock(&exp->exp_lock);
1247                 connected = (exp->exp_conn_cnt > 0);
1248                 spin_unlock(&exp->exp_lock);
1249                 return connected;
1250         }
1251         return 0;
1252 }
1253 EXPORT_SYMBOL(class_connected_export);
1254
1255 static void class_disconnect_export_list(cfs_list_t *list,
1256                                          enum obd_option flags)
1257 {
1258         int rc;
1259         struct obd_export *exp;
1260         ENTRY;
1261
1262         /* It's possible that an export may disconnect itself, but
1263          * nothing else will be added to this list. */
1264         while (!cfs_list_empty(list)) {
1265                 exp = cfs_list_entry(list->next, struct obd_export,
1266                                      exp_obd_chain);
1267                 /* need for safe call CDEBUG after obd_disconnect */
1268                 class_export_get(exp);
1269
1270                 spin_lock(&exp->exp_lock);
1271                 exp->exp_flags = flags;
1272                 spin_unlock(&exp->exp_lock);
1273
1274                 if (obd_uuid_equals(&exp->exp_client_uuid,
1275                                     &exp->exp_obd->obd_uuid)) {
1276                         CDEBUG(D_HA,
1277                                "exp %p export uuid == obd uuid, don't discon\n",
1278                                exp);
1279                         /* Need to delete this now so we don't end up pointing
1280                          * to work_list later when this export is cleaned up. */
1281                         cfs_list_del_init(&exp->exp_obd_chain);
1282                         class_export_put(exp);
1283                         continue;
1284                 }
1285
1286                 class_export_get(exp);
1287                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1288                        "last request at "CFS_TIME_T"\n",
1289                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1290                        exp, exp->exp_last_request_time);
1291                 /* release one export reference anyway */
1292                 rc = obd_disconnect(exp);
1293
1294                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1295                        obd_export_nid2str(exp), exp, rc);
1296                 class_export_put(exp);
1297         }
1298         EXIT;
1299 }
1300
1301 void class_disconnect_exports(struct obd_device *obd)
1302 {
1303         cfs_list_t work_list;
1304         ENTRY;
1305
1306         /* Move all of the exports from obd_exports to a work list, en masse. */
1307         CFS_INIT_LIST_HEAD(&work_list);
1308         spin_lock(&obd->obd_dev_lock);
1309         cfs_list_splice_init(&obd->obd_exports, &work_list);
1310         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1311         spin_unlock(&obd->obd_dev_lock);
1312
1313         if (!cfs_list_empty(&work_list)) {
1314                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1315                        "disconnecting them\n", obd->obd_minor, obd);
1316                 class_disconnect_export_list(&work_list,
1317                                              exp_flags_from_obd(obd));
1318         } else
1319                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1320                        obd->obd_minor, obd);
1321         EXIT;
1322 }
1323 EXPORT_SYMBOL(class_disconnect_exports);
1324
1325 /* Remove exports that have not completed recovery.
1326  */
1327 void class_disconnect_stale_exports(struct obd_device *obd,
1328                                     int (*test_export)(struct obd_export *))
1329 {
1330         cfs_list_t work_list;
1331         struct obd_export *exp, *n;
1332         int evicted = 0;
1333         ENTRY;
1334
1335         CFS_INIT_LIST_HEAD(&work_list);
1336         spin_lock(&obd->obd_dev_lock);
1337         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1338                                      exp_obd_chain) {
1339                 /* don't count self-export as client */
1340                 if (obd_uuid_equals(&exp->exp_client_uuid,
1341                                     &exp->exp_obd->obd_uuid))
1342                         continue;
1343
1344                 /* don't evict clients which have no slot in last_rcvd
1345                  * (e.g. lightweight connection) */
1346                 if (exp->exp_target_data.ted_lr_idx == -1)
1347                         continue;
1348
1349                 spin_lock(&exp->exp_lock);
1350                 if (exp->exp_failed || test_export(exp)) {
1351                         spin_unlock(&exp->exp_lock);
1352                         continue;
1353                 }
1354                 exp->exp_failed = 1;
1355                 spin_unlock(&exp->exp_lock);
1356
1357                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1358                 evicted++;
1359                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1360                        obd->obd_name, exp->exp_client_uuid.uuid,
1361                        exp->exp_connection == NULL ? "<unknown>" :
1362                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1363                 print_export_data(exp, "EVICTING", 0);
1364         }
1365         spin_unlock(&obd->obd_dev_lock);
1366
1367         if (evicted)
1368                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1369                               obd->obd_name, evicted);
1370
1371         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1372                                                  OBD_OPT_ABORT_RECOV);
1373         EXIT;
1374 }
1375 EXPORT_SYMBOL(class_disconnect_stale_exports);
1376
1377 void class_fail_export(struct obd_export *exp)
1378 {
1379         int rc, already_failed;
1380
1381         spin_lock(&exp->exp_lock);
1382         already_failed = exp->exp_failed;
1383         exp->exp_failed = 1;
1384         spin_unlock(&exp->exp_lock);
1385
1386         if (already_failed) {
1387                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1388                        exp, exp->exp_client_uuid.uuid);
1389                 return;
1390         }
1391
1392         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1393                exp, exp->exp_client_uuid.uuid);
1394
1395         if (obd_dump_on_timeout)
1396                 libcfs_debug_dumplog();
1397
1398         /* need for safe call CDEBUG after obd_disconnect */
1399         class_export_get(exp);
1400
1401         /* Most callers into obd_disconnect are removing their own reference
1402          * (request, for example) in addition to the one from the hash table.
1403          * We don't have such a reference here, so make one. */
1404         class_export_get(exp);
1405         rc = obd_disconnect(exp);
1406         if (rc)
1407                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1408         else
1409                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1410                        exp, exp->exp_client_uuid.uuid);
1411         class_export_put(exp);
1412 }
1413 EXPORT_SYMBOL(class_fail_export);
1414
1415 char *obd_export_nid2str(struct obd_export *exp)
1416 {
1417         if (exp->exp_connection != NULL)
1418                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1419
1420         return "(no nid)";
1421 }
1422 EXPORT_SYMBOL(obd_export_nid2str);
1423
1424 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1425 {
1426         cfs_hash_t *nid_hash;
1427         struct obd_export *doomed_exp = NULL;
1428         int exports_evicted = 0;
1429
1430         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1431
1432         spin_lock(&obd->obd_dev_lock);
1433         /* umount has run already, so evict thread should leave
1434          * its task to umount thread now */
1435         if (obd->obd_stopping) {
1436                 spin_unlock(&obd->obd_dev_lock);
1437                 return exports_evicted;
1438         }
1439         nid_hash = obd->obd_nid_hash;
1440         cfs_hash_getref(nid_hash);
1441         spin_unlock(&obd->obd_dev_lock);
1442
1443         do {
1444                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1445                 if (doomed_exp == NULL)
1446                         break;
1447
1448                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1449                          "nid %s found, wanted nid %s, requested nid %s\n",
1450                          obd_export_nid2str(doomed_exp),
1451                          libcfs_nid2str(nid_key), nid);
1452                 LASSERTF(doomed_exp != obd->obd_self_export,
1453                          "self-export is hashed by NID?\n");
1454                 exports_evicted++;
1455                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1456                               "request\n", obd->obd_name,
1457                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1458                               obd_export_nid2str(doomed_exp));
1459                 class_fail_export(doomed_exp);
1460                 class_export_put(doomed_exp);
1461         } while (1);
1462
1463         cfs_hash_putref(nid_hash);
1464
1465         if (!exports_evicted)
1466                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1467                        obd->obd_name, nid);
1468         return exports_evicted;
1469 }
1470 EXPORT_SYMBOL(obd_export_evict_by_nid);
1471
1472 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1473 {
1474         cfs_hash_t *uuid_hash;
1475         struct obd_export *doomed_exp = NULL;
1476         struct obd_uuid doomed_uuid;
1477         int exports_evicted = 0;
1478
1479         spin_lock(&obd->obd_dev_lock);
1480         if (obd->obd_stopping) {
1481                 spin_unlock(&obd->obd_dev_lock);
1482                 return exports_evicted;
1483         }
1484         uuid_hash = obd->obd_uuid_hash;
1485         cfs_hash_getref(uuid_hash);
1486         spin_unlock(&obd->obd_dev_lock);
1487
1488         obd_str2uuid(&doomed_uuid, uuid);
1489         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1490                 CERROR("%s: can't evict myself\n", obd->obd_name);
1491                 cfs_hash_putref(uuid_hash);
1492                 return exports_evicted;
1493         }
1494
1495         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1496
1497         if (doomed_exp == NULL) {
1498                 CERROR("%s: can't disconnect %s: no exports found\n",
1499                        obd->obd_name, uuid);
1500         } else {
1501                 CWARN("%s: evicting %s at adminstrative request\n",
1502                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1503                 class_fail_export(doomed_exp);
1504                 class_export_put(doomed_exp);
1505                 exports_evicted++;
1506         }
1507         cfs_hash_putref(uuid_hash);
1508
1509         return exports_evicted;
1510 }
1511 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1512
1513 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1514 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1515 EXPORT_SYMBOL(class_export_dump_hook);
1516 #endif
1517
1518 static void print_export_data(struct obd_export *exp, const char *status,
1519                               int locks)
1520 {
1521         struct ptlrpc_reply_state *rs;
1522         struct ptlrpc_reply_state *first_reply = NULL;
1523         int nreplies = 0;
1524
1525         spin_lock(&exp->exp_lock);
1526         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1527                                 rs_exp_list) {
1528                 if (nreplies == 0)
1529                         first_reply = rs;
1530                 nreplies++;
1531         }
1532         spin_unlock(&exp->exp_lock);
1533
1534         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1535                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1536                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1537                cfs_atomic_read(&exp->exp_rpc_count),
1538                cfs_atomic_read(&exp->exp_cb_count),
1539                cfs_atomic_read(&exp->exp_locks_count),
1540                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1541                nreplies, first_reply, nreplies > 3 ? "..." : "",
1542                exp->exp_last_committed);
1543 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1544         if (locks && class_export_dump_hook != NULL)
1545                 class_export_dump_hook(exp);
1546 #endif
1547 }
1548
1549 void dump_exports(struct obd_device *obd, int locks)
1550 {
1551         struct obd_export *exp;
1552
1553         spin_lock(&obd->obd_dev_lock);
1554         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1555                 print_export_data(exp, "ACTIVE", locks);
1556         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1557                 print_export_data(exp, "UNLINKED", locks);
1558         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1559                 print_export_data(exp, "DELAYED", locks);
1560         spin_unlock(&obd->obd_dev_lock);
1561         spin_lock(&obd_zombie_impexp_lock);
1562         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1563                 print_export_data(exp, "ZOMBIE", locks);
1564         spin_unlock(&obd_zombie_impexp_lock);
1565 }
1566 EXPORT_SYMBOL(dump_exports);
1567
1568 void obd_exports_barrier(struct obd_device *obd)
1569 {
1570         int waited = 2;
1571         LASSERT(cfs_list_empty(&obd->obd_exports));
1572         spin_lock(&obd->obd_dev_lock);
1573         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1574                 spin_unlock(&obd->obd_dev_lock);
1575                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1576                                                    cfs_time_seconds(waited));
1577                 if (waited > 5 && IS_PO2(waited)) {
1578                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1579                                       "more than %d seconds. "
1580                                       "The obd refcount = %d. Is it stuck?\n",
1581                                       obd->obd_name, waited,
1582                                       cfs_atomic_read(&obd->obd_refcount));
1583                         dump_exports(obd, 1);
1584                 }
1585                 waited *= 2;
1586                 spin_lock(&obd->obd_dev_lock);
1587         }
1588         spin_unlock(&obd->obd_dev_lock);
1589 }
1590 EXPORT_SYMBOL(obd_exports_barrier);
1591
1592 /* Total amount of zombies to be destroyed */
1593 static int zombies_count = 0;
1594
1595 /**
1596  * kill zombie imports and exports
1597  */
1598 void obd_zombie_impexp_cull(void)
1599 {
1600         struct obd_import *import;
1601         struct obd_export *export;
1602         ENTRY;
1603
1604         do {
1605                 spin_lock(&obd_zombie_impexp_lock);
1606
1607                 import = NULL;
1608                 if (!cfs_list_empty(&obd_zombie_imports)) {
1609                         import = cfs_list_entry(obd_zombie_imports.next,
1610                                                 struct obd_import,
1611                                                 imp_zombie_chain);
1612                         cfs_list_del_init(&import->imp_zombie_chain);
1613                 }
1614
1615                 export = NULL;
1616                 if (!cfs_list_empty(&obd_zombie_exports)) {
1617                         export = cfs_list_entry(obd_zombie_exports.next,
1618                                                 struct obd_export,
1619                                                 exp_obd_chain);
1620                         cfs_list_del_init(&export->exp_obd_chain);
1621                 }
1622
1623                 spin_unlock(&obd_zombie_impexp_lock);
1624
1625                 if (import != NULL) {
1626                         class_import_destroy(import);
1627                         spin_lock(&obd_zombie_impexp_lock);
1628                         zombies_count--;
1629                         spin_unlock(&obd_zombie_impexp_lock);
1630                 }
1631
1632                 if (export != NULL) {
1633                         class_export_destroy(export);
1634                         spin_lock(&obd_zombie_impexp_lock);
1635                         zombies_count--;
1636                         spin_unlock(&obd_zombie_impexp_lock);
1637                 }
1638
1639                 cfs_cond_resched();
1640         } while (import != NULL || export != NULL);
1641         EXIT;
1642 }
1643
1644 static struct completion        obd_zombie_start;
1645 static struct completion        obd_zombie_stop;
1646 static unsigned long            obd_zombie_flags;
1647 static cfs_waitq_t              obd_zombie_waitq;
1648 static pid_t                    obd_zombie_pid;
1649
1650 enum {
1651         OBD_ZOMBIE_STOP         = 0x0001,
1652 };
1653
1654 /**
1655  * check for work for kill zombie import/export thread.
1656  */
1657 static int obd_zombie_impexp_check(void *arg)
1658 {
1659         int rc;
1660
1661         spin_lock(&obd_zombie_impexp_lock);
1662         rc = (zombies_count == 0) &&
1663              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1664         spin_unlock(&obd_zombie_impexp_lock);
1665
1666         RETURN(rc);
1667 }
1668
1669 /**
1670  * Add export to the obd_zombe thread and notify it.
1671  */
1672 static void obd_zombie_export_add(struct obd_export *exp) {
1673         spin_lock(&exp->exp_obd->obd_dev_lock);
1674         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1675         cfs_list_del_init(&exp->exp_obd_chain);
1676         spin_unlock(&exp->exp_obd->obd_dev_lock);
1677         spin_lock(&obd_zombie_impexp_lock);
1678         zombies_count++;
1679         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1680         spin_unlock(&obd_zombie_impexp_lock);
1681
1682         obd_zombie_impexp_notify();
1683 }
1684
1685 /**
1686  * Add import to the obd_zombe thread and notify it.
1687  */
1688 static void obd_zombie_import_add(struct obd_import *imp) {
1689         LASSERT(imp->imp_sec == NULL);
1690         LASSERT(imp->imp_rq_pool == NULL);
1691         spin_lock(&obd_zombie_impexp_lock);
1692         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1693         zombies_count++;
1694         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1695         spin_unlock(&obd_zombie_impexp_lock);
1696
1697         obd_zombie_impexp_notify();
1698 }
1699
1700 /**
1701  * notify import/export destroy thread about new zombie.
1702  */
1703 static void obd_zombie_impexp_notify(void)
1704 {
1705         /*
1706          * Make sure obd_zomebie_impexp_thread get this notification.
1707          * It is possible this signal only get by obd_zombie_barrier, and
1708          * barrier gulps this notification and sleeps away and hangs ensues
1709          */
1710         cfs_waitq_broadcast(&obd_zombie_waitq);
1711 }
1712
1713 /**
1714  * check whether obd_zombie is idle
1715  */
1716 static int obd_zombie_is_idle(void)
1717 {
1718         int rc;
1719
1720         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1721         spin_lock(&obd_zombie_impexp_lock);
1722         rc = (zombies_count == 0);
1723         spin_unlock(&obd_zombie_impexp_lock);
1724         return rc;
1725 }
1726
1727 /**
1728  * wait when obd_zombie import/export queues become empty
1729  */
1730 void obd_zombie_barrier(void)
1731 {
1732         struct l_wait_info lwi = { 0 };
1733
1734         if (obd_zombie_pid == cfs_curproc_pid())
1735                 /* don't wait for myself */
1736                 return;
1737         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1738 }
1739 EXPORT_SYMBOL(obd_zombie_barrier);
1740
1741 #ifdef __KERNEL__
1742
1743 /**
1744  * destroy zombie export/import thread.
1745  */
1746 static int obd_zombie_impexp_thread(void *unused)
1747 {
1748         int rc;
1749
1750         rc = cfs_daemonize_ctxt("obd_zombid");
1751         if (rc != 0) {
1752                 complete(&obd_zombie_start);
1753                 RETURN(rc);
1754         }
1755
1756         complete(&obd_zombie_start);
1757
1758         obd_zombie_pid = cfs_curproc_pid();
1759
1760         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1761                 struct l_wait_info lwi = { 0 };
1762
1763                 l_wait_event(obd_zombie_waitq,
1764                              !obd_zombie_impexp_check(NULL), &lwi);
1765                 obd_zombie_impexp_cull();
1766
1767                 /*
1768                  * Notify obd_zombie_barrier callers that queues
1769                  * may be empty.
1770                  */
1771                 cfs_waitq_signal(&obd_zombie_waitq);
1772         }
1773
1774         complete(&obd_zombie_stop);
1775
1776         RETURN(0);
1777 }
1778
1779 #else /* ! KERNEL */
1780
1781 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1782 static void *obd_zombie_impexp_work_cb;
1783 static void *obd_zombie_impexp_idle_cb;
1784
1785 int obd_zombie_impexp_kill(void *arg)
1786 {
1787         int rc = 0;
1788
1789         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1790                 obd_zombie_impexp_cull();
1791                 rc = 1;
1792         }
1793         cfs_atomic_dec(&zombie_recur);
1794         return rc;
1795 }
1796
1797 #endif
1798
1799 /**
1800  * start destroy zombie import/export thread
1801  */
1802 int obd_zombie_impexp_init(void)
1803 {
1804         int rc;
1805
1806         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1807         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1808         spin_lock_init(&obd_zombie_impexp_lock);
1809         init_completion(&obd_zombie_start);
1810         init_completion(&obd_zombie_stop);
1811         cfs_waitq_init(&obd_zombie_waitq);
1812         obd_zombie_pid = 0;
1813
1814 #ifdef __KERNEL__
1815         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1816         if (rc < 0)
1817                 RETURN(rc);
1818
1819         wait_for_completion(&obd_zombie_start);
1820 #else
1821
1822         obd_zombie_impexp_work_cb =
1823                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1824                                                  &obd_zombie_impexp_kill, NULL);
1825
1826         obd_zombie_impexp_idle_cb =
1827                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1828                                                  &obd_zombie_impexp_check, NULL);
1829         rc = 0;
1830 #endif
1831         RETURN(rc);
1832 }
1833 /**
1834  * stop destroy zombie import/export thread
1835  */
1836 void obd_zombie_impexp_stop(void)
1837 {
1838         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1839         obd_zombie_impexp_notify();
1840 #ifdef __KERNEL__
1841         wait_for_completion(&obd_zombie_stop);
1842 #else
1843         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1844         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1845 #endif
1846 }
1847
1848 /***** Kernel-userspace comm helpers *******/
1849
1850 /* Get length of entire message, including header */
1851 int kuc_len(int payload_len)
1852 {
1853         return sizeof(struct kuc_hdr) + payload_len;
1854 }
1855 EXPORT_SYMBOL(kuc_len);
1856
1857 /* Get a pointer to kuc header, given a ptr to the payload
1858  * @param p Pointer to payload area
1859  * @returns Pointer to kuc header
1860  */
1861 struct kuc_hdr * kuc_ptr(void *p)
1862 {
1863         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1864         LASSERT(lh->kuc_magic == KUC_MAGIC);
1865         return lh;
1866 }
1867 EXPORT_SYMBOL(kuc_ptr);
1868
1869 /* Test if payload is part of kuc message
1870  * @param p Pointer to payload area
1871  * @returns boolean
1872  */
1873 int kuc_ispayload(void *p)
1874 {
1875         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1876
1877         if (kh->kuc_magic == KUC_MAGIC)
1878                 return 1;
1879         else
1880                 return 0;
1881 }
1882 EXPORT_SYMBOL(kuc_ispayload);
1883
1884 /* Alloc space for a message, and fill in header
1885  * @return Pointer to payload area
1886  */
1887 void *kuc_alloc(int payload_len, int transport, int type)
1888 {
1889         struct kuc_hdr *lh;
1890         int len = kuc_len(payload_len);
1891
1892         OBD_ALLOC(lh, len);
1893         if (lh == NULL)
1894                 return ERR_PTR(-ENOMEM);
1895
1896         lh->kuc_magic = KUC_MAGIC;
1897         lh->kuc_transport = transport;
1898         lh->kuc_msgtype = type;
1899         lh->kuc_msglen = len;
1900
1901         return (void *)(lh + 1);
1902 }
1903 EXPORT_SYMBOL(kuc_alloc);
1904
1905 /* Takes pointer to payload area */
1906 inline void kuc_free(void *p, int payload_len)
1907 {
1908         struct kuc_hdr *lh = kuc_ptr(p);
1909         OBD_FREE(lh, kuc_len(payload_len));
1910 }
1911 EXPORT_SYMBOL(kuc_free);
1912
1913
1914