Whamcloud - gitweb
LU-3409 llite: silence lockdep warning in ll_md_blocking_ast
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!cfs_request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 cfs_try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151 EXPORT_SYMBOL(class_get_type);
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         cfs_module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161 EXPORT_SYMBOL(class_put_type);
162
163 #define CLASS_MAX_NAME 1024
164
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166                         struct lprocfs_vars *vars, const char *name,
167                         struct lu_device_type *ldt)
168 {
169         struct obd_type *type;
170         int rc = 0;
171         ENTRY;
172
173         /* sanity check */
174         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
175
176         if (class_search_type(name)) {
177                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
178                 RETURN(-EEXIST);
179         }
180
181         rc = -ENOMEM;
182         OBD_ALLOC(type, sizeof(*type));
183         if (type == NULL)
184                 RETURN(rc);
185
186         OBD_ALLOC_PTR(type->typ_dt_ops);
187         OBD_ALLOC_PTR(type->typ_md_ops);
188         OBD_ALLOC(type->typ_name, strlen(name) + 1);
189
190         if (type->typ_dt_ops == NULL ||
191             type->typ_md_ops == NULL ||
192             type->typ_name == NULL)
193                 GOTO (failed, rc);
194
195         *(type->typ_dt_ops) = *dt_ops;
196         /* md_ops is optional */
197         if (md_ops)
198                 *(type->typ_md_ops) = *md_ops;
199         strcpy(type->typ_name, name);
200         spin_lock_init(&type->obd_type_lock);
201
202 #ifdef LPROCFS
203         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
204                                               vars, type);
205         if (IS_ERR(type->typ_procroot)) {
206                 rc = PTR_ERR(type->typ_procroot);
207                 type->typ_procroot = NULL;
208                 GOTO (failed, rc);
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         cfs_list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224  failed:
225         if (type->typ_name != NULL)
226                 OBD_FREE(type->typ_name, strlen(name) + 1);
227         if (type->typ_md_ops != NULL)
228                 OBD_FREE_PTR(type->typ_md_ops);
229         if (type->typ_dt_ops != NULL)
230                 OBD_FREE_PTR(type->typ_dt_ops);
231         OBD_FREE(type, sizeof(*type));
232         RETURN(rc);
233 }
234 EXPORT_SYMBOL(class_register_type);
235
236 int class_unregister_type(const char *name)
237 {
238         struct obd_type *type = class_search_type(name);
239         ENTRY;
240
241         if (!type) {
242                 CERROR("unknown obd type\n");
243                 RETURN(-EINVAL);
244         }
245
246         if (type->typ_refcnt) {
247                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248                 /* This is a bad situation, let's make the best of it */
249                 /* Remove ops, but leave the name for debugging */
250                 OBD_FREE_PTR(type->typ_dt_ops);
251                 OBD_FREE_PTR(type->typ_md_ops);
252                 RETURN(-EBUSY);
253         }
254
255         /* we do not use type->typ_procroot as for compatibility purposes
256          * other modules can share names (i.e. lod can use lov entry). so
257          * we can't reference pointer as it can get invalided when another
258          * module removes the entry */
259         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
260
261         if (type->typ_lu)
262                 lu_device_type_fini(type->typ_lu);
263
264         spin_lock(&obd_types_lock);
265         cfs_list_del(&type->typ_chain);
266         spin_unlock(&obd_types_lock);
267         OBD_FREE(type->typ_name, strlen(name) + 1);
268         if (type->typ_dt_ops != NULL)
269                 OBD_FREE_PTR(type->typ_dt_ops);
270         if (type->typ_md_ops != NULL)
271                 OBD_FREE_PTR(type->typ_md_ops);
272         OBD_FREE(type, sizeof(*type));
273         RETURN(0);
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
276
277 /**
278  * Create a new obd device.
279  *
280  * Find an empty slot in ::obd_devs[], create a new obd device in it.
281  *
282  * \param[in] type_name obd device type string.
283  * \param[in] name      obd device name.
284  *
285  * \retval NULL if create fails, otherwise return the obd device
286  *         pointer created.
287  */
288 struct obd_device *class_newdev(const char *type_name, const char *name)
289 {
290         struct obd_device *result = NULL;
291         struct obd_device *newdev;
292         struct obd_type *type = NULL;
293         int i;
294         int new_obd_minor = 0;
295         ENTRY;
296
297         if (strlen(name) >= MAX_OBD_NAME) {
298                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299                 RETURN(ERR_PTR(-EINVAL));
300         }
301
302         type = class_get_type(type_name);
303         if (type == NULL){
304                 CERROR("OBD: unknown type: %s\n", type_name);
305                 RETURN(ERR_PTR(-ENODEV));
306         }
307
308         newdev = obd_device_alloc();
309         if (newdev == NULL)
310                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
311
312         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
313
314         write_lock(&obd_dev_lock);
315         for (i = 0; i < class_devno_max(); i++) {
316                 struct obd_device *obd = class_num2obd(i);
317
318                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
319                         CERROR("Device %s already exists at %d, won't add\n",
320                                name, i);
321                         if (result) {
322                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
323                                          "%p obd_magic %08x != %08x\n", result,
324                                          result->obd_magic, OBD_DEVICE_MAGIC);
325                                 LASSERTF(result->obd_minor == new_obd_minor,
326                                          "%p obd_minor %d != %d\n", result,
327                                          result->obd_minor, new_obd_minor);
328
329                                 obd_devs[result->obd_minor] = NULL;
330                                 result->obd_name[0]='\0';
331                          }
332                         result = ERR_PTR(-EEXIST);
333                         break;
334                 }
335                 if (!result && !obd) {
336                         result = newdev;
337                         result->obd_minor = i;
338                         new_obd_minor = i;
339                         result->obd_type = type;
340                         strncpy(result->obd_name, name,
341                                 sizeof(result->obd_name) - 1);
342                         obd_devs[i] = result;
343                 }
344         }
345         write_unlock(&obd_dev_lock);
346
347         if (result == NULL && i >= class_devno_max()) {
348                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
349                        class_devno_max());
350                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
351         }
352
353         if (IS_ERR(result))
354                 GOTO(out, result);
355
356         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
357                result->obd_name, result);
358
359         RETURN(result);
360 out:
361         obd_device_free(newdev);
362 out_type:
363         class_put_type(type);
364         return result;
365 }
366
367 void class_release_dev(struct obd_device *obd)
368 {
369         struct obd_type *obd_type = obd->obd_type;
370
371         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
372                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
373         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
374                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
375         LASSERT(obd_type != NULL);
376
377         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
378                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
379
380         write_lock(&obd_dev_lock);
381         obd_devs[obd->obd_minor] = NULL;
382         write_unlock(&obd_dev_lock);
383         obd_device_free(obd);
384
385         class_put_type(obd_type);
386 }
387
388 int class_name2dev(const char *name)
389 {
390         int i;
391
392         if (!name)
393                 return -1;
394
395         read_lock(&obd_dev_lock);
396         for (i = 0; i < class_devno_max(); i++) {
397                 struct obd_device *obd = class_num2obd(i);
398
399                 if (obd && strcmp(name, obd->obd_name) == 0) {
400                         /* Make sure we finished attaching before we give
401                            out any references */
402                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
403                         if (obd->obd_attached) {
404                                 read_unlock(&obd_dev_lock);
405                                 return i;
406                         }
407                         break;
408                 }
409         }
410         read_unlock(&obd_dev_lock);
411
412         return -1;
413 }
414 EXPORT_SYMBOL(class_name2dev);
415
416 struct obd_device *class_name2obd(const char *name)
417 {
418         int dev = class_name2dev(name);
419
420         if (dev < 0 || dev > class_devno_max())
421                 return NULL;
422         return class_num2obd(dev);
423 }
424 EXPORT_SYMBOL(class_name2obd);
425
426 int class_uuid2dev(struct obd_uuid *uuid)
427 {
428         int i;
429
430         read_lock(&obd_dev_lock);
431         for (i = 0; i < class_devno_max(); i++) {
432                 struct obd_device *obd = class_num2obd(i);
433
434                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
435                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
436                         read_unlock(&obd_dev_lock);
437                         return i;
438                 }
439         }
440         read_unlock(&obd_dev_lock);
441
442         return -1;
443 }
444 EXPORT_SYMBOL(class_uuid2dev);
445
446 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
447 {
448         int dev = class_uuid2dev(uuid);
449         if (dev < 0)
450                 return NULL;
451         return class_num2obd(dev);
452 }
453 EXPORT_SYMBOL(class_uuid2obd);
454
455 /**
456  * Get obd device from ::obd_devs[]
457  *
458  * \param num [in] array index
459  *
460  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
461  *         otherwise return the obd device there.
462  */
463 struct obd_device *class_num2obd(int num)
464 {
465         struct obd_device *obd = NULL;
466
467         if (num < class_devno_max()) {
468                 obd = obd_devs[num];
469                 if (obd == NULL)
470                         return NULL;
471
472                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
473                          "%p obd_magic %08x != %08x\n",
474                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475                 LASSERTF(obd->obd_minor == num,
476                          "%p obd_minor %0d != %0d\n",
477                          obd, obd->obd_minor, num);
478         }
479
480         return obd;
481 }
482 EXPORT_SYMBOL(class_num2obd);
483
484 /**
485  * Get obd devices count. Device in any
486  *    state are counted
487  * \retval obd device count
488  */
489 int get_devices_count(void)
490 {
491         int index, max_index = class_devno_max(), dev_count = 0;
492
493         read_lock(&obd_dev_lock);
494         for (index = 0; index <= max_index; index++) {
495                 struct obd_device *obd = class_num2obd(index);
496                 if (obd != NULL)
497                         dev_count++;
498         }
499         read_unlock(&obd_dev_lock);
500
501         return dev_count;
502 }
503 EXPORT_SYMBOL(get_devices_count);
504
505 void class_obd_list(void)
506 {
507         char *status;
508         int i;
509
510         read_lock(&obd_dev_lock);
511         for (i = 0; i < class_devno_max(); i++) {
512                 struct obd_device *obd = class_num2obd(i);
513
514                 if (obd == NULL)
515                         continue;
516                 if (obd->obd_stopping)
517                         status = "ST";
518                 else if (obd->obd_set_up)
519                         status = "UP";
520                 else if (obd->obd_attached)
521                         status = "AT";
522                 else
523                         status = "--";
524                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
525                          i, status, obd->obd_type->typ_name,
526                          obd->obd_name, obd->obd_uuid.uuid,
527                          cfs_atomic_read(&obd->obd_refcount));
528         }
529         read_unlock(&obd_dev_lock);
530         return;
531 }
532
533 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
534    specified, then only the client with that uuid is returned,
535    otherwise any client connected to the tgt is returned. */
536 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
537                                           const char * typ_name,
538                                           struct obd_uuid *grp_uuid)
539 {
540         int i;
541
542         read_lock(&obd_dev_lock);
543         for (i = 0; i < class_devno_max(); i++) {
544                 struct obd_device *obd = class_num2obd(i);
545
546                 if (obd == NULL)
547                         continue;
548                 if ((strncmp(obd->obd_type->typ_name, typ_name,
549                              strlen(typ_name)) == 0)) {
550                         if (obd_uuid_equals(tgt_uuid,
551                                             &obd->u.cli.cl_target_uuid) &&
552                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
553                                                          &obd->obd_uuid) : 1)) {
554                                 read_unlock(&obd_dev_lock);
555                                 return obd;
556                         }
557                 }
558         }
559         read_unlock(&obd_dev_lock);
560
561         return NULL;
562 }
563 EXPORT_SYMBOL(class_find_client_obd);
564
565 /* Iterate the obd_device list looking devices have grp_uuid. Start
566    searching at *next, and if a device is found, the next index to look
567    at is saved in *next. If next is NULL, then the first matching device
568    will always be returned. */
569 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
570 {
571         int i;
572
573         if (next == NULL)
574                 i = 0;
575         else if (*next >= 0 && *next < class_devno_max())
576                 i = *next;
577         else
578                 return NULL;
579
580         read_lock(&obd_dev_lock);
581         for (; i < class_devno_max(); i++) {
582                 struct obd_device *obd = class_num2obd(i);
583
584                 if (obd == NULL)
585                         continue;
586                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
587                         if (next != NULL)
588                                 *next = i+1;
589                         read_unlock(&obd_dev_lock);
590                         return obd;
591                 }
592         }
593         read_unlock(&obd_dev_lock);
594
595         return NULL;
596 }
597 EXPORT_SYMBOL(class_devices_in_group);
598
599 /**
600  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
601  * adjust sptlrpc settings accordingly.
602  */
603 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
604 {
605         struct obd_device  *obd;
606         const char         *type;
607         int                 i, rc = 0, rc2;
608
609         LASSERT(namelen > 0);
610
611         read_lock(&obd_dev_lock);
612         for (i = 0; i < class_devno_max(); i++) {
613                 obd = class_num2obd(i);
614
615                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
616                         continue;
617
618                 /* only notify mdc, osc, mdt, ost */
619                 type = obd->obd_type->typ_name;
620                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
621                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
622                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
623                     strcmp(type, LUSTRE_OST_NAME) != 0)
624                         continue;
625
626                 if (strncmp(obd->obd_name, fsname, namelen))
627                         continue;
628
629                 class_incref(obd, __FUNCTION__, obd);
630                 read_unlock(&obd_dev_lock);
631                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
632                                          sizeof(KEY_SPTLRPC_CONF),
633                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
634                 rc = rc ? rc : rc2;
635                 class_decref(obd, __FUNCTION__, obd);
636                 read_lock(&obd_dev_lock);
637         }
638         read_unlock(&obd_dev_lock);
639         return rc;
640 }
641 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
642
643 void obd_cleanup_caches(void)
644 {
645         int rc;
646
647         ENTRY;
648         if (obd_device_cachep) {
649                 rc = cfs_mem_cache_destroy(obd_device_cachep);
650                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
651                 obd_device_cachep = NULL;
652         }
653         if (obdo_cachep) {
654                 rc = cfs_mem_cache_destroy(obdo_cachep);
655                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
656                 obdo_cachep = NULL;
657         }
658         if (import_cachep) {
659                 rc = cfs_mem_cache_destroy(import_cachep);
660                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
661                 import_cachep = NULL;
662         }
663         if (capa_cachep) {
664                 rc = cfs_mem_cache_destroy(capa_cachep);
665                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
666                 capa_cachep = NULL;
667         }
668         EXIT;
669 }
670
671 int obd_init_caches(void)
672 {
673         ENTRY;
674
675         LASSERT(obd_device_cachep == NULL);
676         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
677                                                  sizeof(struct obd_device),
678                                                  0, 0);
679         if (!obd_device_cachep)
680                 GOTO(out, -ENOMEM);
681
682         LASSERT(obdo_cachep == NULL);
683         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
684                                            0, 0);
685         if (!obdo_cachep)
686                 GOTO(out, -ENOMEM);
687
688         LASSERT(import_cachep == NULL);
689         import_cachep = cfs_mem_cache_create("ll_import_cache",
690                                              sizeof(struct obd_import),
691                                              0, 0);
692         if (!import_cachep)
693                 GOTO(out, -ENOMEM);
694
695         LASSERT(capa_cachep == NULL);
696         capa_cachep = cfs_mem_cache_create("capa_cache",
697                                            sizeof(struct obd_capa), 0, 0);
698         if (!capa_cachep)
699                 GOTO(out, -ENOMEM);
700
701         RETURN(0);
702  out:
703         obd_cleanup_caches();
704         RETURN(-ENOMEM);
705
706 }
707
708 /* map connection to client */
709 struct obd_export *class_conn2export(struct lustre_handle *conn)
710 {
711         struct obd_export *export;
712         ENTRY;
713
714         if (!conn) {
715                 CDEBUG(D_CACHE, "looking for null handle\n");
716                 RETURN(NULL);
717         }
718
719         if (conn->cookie == -1) {  /* this means assign a new connection */
720                 CDEBUG(D_CACHE, "want a new connection\n");
721                 RETURN(NULL);
722         }
723
724         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
725         export = class_handle2object(conn->cookie);
726         RETURN(export);
727 }
728 EXPORT_SYMBOL(class_conn2export);
729
730 struct obd_device *class_exp2obd(struct obd_export *exp)
731 {
732         if (exp)
733                 return exp->exp_obd;
734         return NULL;
735 }
736 EXPORT_SYMBOL(class_exp2obd);
737
738 struct obd_device *class_conn2obd(struct lustre_handle *conn)
739 {
740         struct obd_export *export;
741         export = class_conn2export(conn);
742         if (export) {
743                 struct obd_device *obd = export->exp_obd;
744                 class_export_put(export);
745                 return obd;
746         }
747         return NULL;
748 }
749 EXPORT_SYMBOL(class_conn2obd);
750
751 struct obd_import *class_exp2cliimp(struct obd_export *exp)
752 {
753         struct obd_device *obd = exp->exp_obd;
754         if (obd == NULL)
755                 return NULL;
756         return obd->u.cli.cl_import;
757 }
758 EXPORT_SYMBOL(class_exp2cliimp);
759
760 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
761 {
762         struct obd_device *obd = class_conn2obd(conn);
763         if (obd == NULL)
764                 return NULL;
765         return obd->u.cli.cl_import;
766 }
767 EXPORT_SYMBOL(class_conn2cliimp);
768
769 /* Export management functions */
770 static void class_export_destroy(struct obd_export *exp)
771 {
772         struct obd_device *obd = exp->exp_obd;
773         ENTRY;
774
775         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
776         LASSERT(obd != NULL);
777
778         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
779                exp->exp_client_uuid.uuid, obd->obd_name);
780
781         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
782         if (exp->exp_connection)
783                 ptlrpc_put_connection_superhack(exp->exp_connection);
784
785         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
786         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
787         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
788         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
789         obd_destroy_export(exp);
790         class_decref(obd, "export", exp);
791
792         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
793         EXIT;
794 }
795
796 static void export_handle_addref(void *export)
797 {
798         class_export_get(export);
799 }
800
801 static struct portals_handle_ops export_handle_ops = {
802         .hop_addref = export_handle_addref,
803         .hop_free   = NULL,
804 };
805
806 struct obd_export *class_export_get(struct obd_export *exp)
807 {
808         cfs_atomic_inc(&exp->exp_refcount);
809         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
810                cfs_atomic_read(&exp->exp_refcount));
811         return exp;
812 }
813 EXPORT_SYMBOL(class_export_get);
814
815 void class_export_put(struct obd_export *exp)
816 {
817         LASSERT(exp != NULL);
818         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
819         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
820                cfs_atomic_read(&exp->exp_refcount) - 1);
821
822         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
823                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
824                 CDEBUG(D_IOCTL, "final put %p/%s\n",
825                        exp, exp->exp_client_uuid.uuid);
826
827                 /* release nid stat refererence */
828                 lprocfs_exp_cleanup(exp);
829
830                 obd_zombie_export_add(exp);
831         }
832 }
833 EXPORT_SYMBOL(class_export_put);
834
835 /* Creates a new export, adds it to the hash table, and returns a
836  * pointer to it. The refcount is 2: one for the hash reference, and
837  * one for the pointer returned by this function. */
838 struct obd_export *class_new_export(struct obd_device *obd,
839                                     struct obd_uuid *cluuid)
840 {
841         struct obd_export *export;
842         cfs_hash_t *hash = NULL;
843         int rc = 0;
844         ENTRY;
845
846         OBD_ALLOC_PTR(export);
847         if (!export)
848                 return ERR_PTR(-ENOMEM);
849
850         export->exp_conn_cnt = 0;
851         export->exp_lock_hash = NULL;
852         export->exp_flock_hash = NULL;
853         cfs_atomic_set(&export->exp_refcount, 2);
854         cfs_atomic_set(&export->exp_rpc_count, 0);
855         cfs_atomic_set(&export->exp_cb_count, 0);
856         cfs_atomic_set(&export->exp_locks_count, 0);
857 #if LUSTRE_TRACKS_LOCK_EXP_REFS
858         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
859         spin_lock_init(&export->exp_locks_list_guard);
860 #endif
861         cfs_atomic_set(&export->exp_replay_count, 0);
862         export->exp_obd = obd;
863         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
864         spin_lock_init(&export->exp_uncommitted_replies_lock);
865         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
866         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
867         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
868         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
869         class_handle_hash(&export->exp_handle, &export_handle_ops);
870         export->exp_last_request_time = cfs_time_current_sec();
871         spin_lock_init(&export->exp_lock);
872         spin_lock_init(&export->exp_rpc_lock);
873         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
874         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
875         spin_lock_init(&export->exp_bl_list_lock);
876         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
877
878         export->exp_sp_peer = LUSTRE_SP_ANY;
879         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
880         export->exp_client_uuid = *cluuid;
881         obd_init_export(export);
882
883         spin_lock(&obd->obd_dev_lock);
884         /* shouldn't happen, but might race */
885         if (obd->obd_stopping)
886                 GOTO(exit_unlock, rc = -ENODEV);
887
888         hash = cfs_hash_getref(obd->obd_uuid_hash);
889         if (hash == NULL)
890                 GOTO(exit_unlock, rc = -ENODEV);
891         spin_unlock(&obd->obd_dev_lock);
892
893         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
894                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
895                 if (rc != 0) {
896                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
897                                       obd->obd_name, cluuid->uuid, rc);
898                         GOTO(exit_err, rc = -EALREADY);
899                 }
900         }
901
902         spin_lock(&obd->obd_dev_lock);
903         if (obd->obd_stopping) {
904                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
905                 GOTO(exit_unlock, rc = -ENODEV);
906         }
907
908         class_incref(obd, "export", export);
909         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
910         cfs_list_add_tail(&export->exp_obd_chain_timed,
911                           &export->exp_obd->obd_exports_timed);
912         export->exp_obd->obd_num_exports++;
913         spin_unlock(&obd->obd_dev_lock);
914         cfs_hash_putref(hash);
915         RETURN(export);
916
917 exit_unlock:
918         spin_unlock(&obd->obd_dev_lock);
919 exit_err:
920         if (hash)
921                 cfs_hash_putref(hash);
922         class_handle_unhash(&export->exp_handle);
923         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
924         obd_destroy_export(export);
925         OBD_FREE_PTR(export);
926         return ERR_PTR(rc);
927 }
928 EXPORT_SYMBOL(class_new_export);
929
930 void class_unlink_export(struct obd_export *exp)
931 {
932         class_handle_unhash(&exp->exp_handle);
933
934         spin_lock(&exp->exp_obd->obd_dev_lock);
935         /* delete an uuid-export hashitem from hashtables */
936         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
937                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
938                              &exp->exp_client_uuid,
939                              &exp->exp_uuid_hash);
940
941         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
942         cfs_list_del_init(&exp->exp_obd_chain_timed);
943         exp->exp_obd->obd_num_exports--;
944         spin_unlock(&exp->exp_obd->obd_dev_lock);
945         class_export_put(exp);
946 }
947 EXPORT_SYMBOL(class_unlink_export);
948
949 /* Import management functions */
950 void class_import_destroy(struct obd_import *imp)
951 {
952         ENTRY;
953
954         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
955                 imp->imp_obd->obd_name);
956
957         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
958
959         ptlrpc_put_connection_superhack(imp->imp_connection);
960
961         while (!cfs_list_empty(&imp->imp_conn_list)) {
962                 struct obd_import_conn *imp_conn;
963
964                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
965                                           struct obd_import_conn, oic_item);
966                 cfs_list_del_init(&imp_conn->oic_item);
967                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
968                 OBD_FREE(imp_conn, sizeof(*imp_conn));
969         }
970
971         LASSERT(imp->imp_sec == NULL);
972         class_decref(imp->imp_obd, "import", imp);
973         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
974         EXIT;
975 }
976
977 static void import_handle_addref(void *import)
978 {
979         class_import_get(import);
980 }
981
982 static struct portals_handle_ops import_handle_ops = {
983         .hop_addref = import_handle_addref,
984         .hop_free   = NULL,
985 };
986
987 struct obd_import *class_import_get(struct obd_import *import)
988 {
989         cfs_atomic_inc(&import->imp_refcount);
990         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
991                cfs_atomic_read(&import->imp_refcount),
992                import->imp_obd->obd_name);
993         return import;
994 }
995 EXPORT_SYMBOL(class_import_get);
996
997 void class_import_put(struct obd_import *imp)
998 {
999         ENTRY;
1000
1001         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1002         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1003
1004         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1005                cfs_atomic_read(&imp->imp_refcount) - 1,
1006                imp->imp_obd->obd_name);
1007
1008         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1009                 CDEBUG(D_INFO, "final put import %p\n", imp);
1010                 obd_zombie_import_add(imp);
1011         }
1012
1013         /* catch possible import put race */
1014         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1015         EXIT;
1016 }
1017 EXPORT_SYMBOL(class_import_put);
1018
1019 static void init_imp_at(struct imp_at *at) {
1020         int i;
1021         at_init(&at->iat_net_latency, 0, 0);
1022         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1023                 /* max service estimates are tracked on the server side, so
1024                    don't use the AT history here, just use the last reported
1025                    val. (But keep hist for proc histogram, worst_ever) */
1026                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1027                         AT_FLG_NOHIST);
1028         }
1029 }
1030
1031 struct obd_import *class_new_import(struct obd_device *obd)
1032 {
1033         struct obd_import *imp;
1034
1035         OBD_ALLOC(imp, sizeof(*imp));
1036         if (imp == NULL)
1037                 return NULL;
1038
1039         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1040         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1041         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1042         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1043         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1044         spin_lock_init(&imp->imp_lock);
1045         imp->imp_last_success_conn = 0;
1046         imp->imp_state = LUSTRE_IMP_NEW;
1047         imp->imp_obd = class_incref(obd, "import", imp);
1048         mutex_init(&imp->imp_sec_mutex);
1049         cfs_waitq_init(&imp->imp_recovery_waitq);
1050
1051         cfs_atomic_set(&imp->imp_refcount, 2);
1052         cfs_atomic_set(&imp->imp_unregistering, 0);
1053         cfs_atomic_set(&imp->imp_inflight, 0);
1054         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1055         cfs_atomic_set(&imp->imp_inval_count, 0);
1056         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1057         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1058         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1059         init_imp_at(&imp->imp_at);
1060
1061         /* the default magic is V2, will be used in connect RPC, and
1062          * then adjusted according to the flags in request/reply. */
1063         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1064
1065         return imp;
1066 }
1067 EXPORT_SYMBOL(class_new_import);
1068
1069 void class_destroy_import(struct obd_import *import)
1070 {
1071         LASSERT(import != NULL);
1072         LASSERT(import != LP_POISON);
1073
1074         class_handle_unhash(&import->imp_handle);
1075
1076         spin_lock(&import->imp_lock);
1077         import->imp_generation++;
1078         spin_unlock(&import->imp_lock);
1079         class_import_put(import);
1080 }
1081 EXPORT_SYMBOL(class_destroy_import);
1082
1083 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1084
1085 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086 {
1087         spin_lock(&exp->exp_locks_list_guard);
1088
1089         LASSERT(lock->l_exp_refs_nr >= 0);
1090
1091         if (lock->l_exp_refs_target != NULL &&
1092             lock->l_exp_refs_target != exp) {
1093                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1094                               exp, lock, lock->l_exp_refs_target);
1095         }
1096         if ((lock->l_exp_refs_nr ++) == 0) {
1097                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1098                 lock->l_exp_refs_target = exp;
1099         }
1100         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1101                lock, exp, lock->l_exp_refs_nr);
1102         spin_unlock(&exp->exp_locks_list_guard);
1103 }
1104 EXPORT_SYMBOL(__class_export_add_lock_ref);
1105
1106 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1107 {
1108         spin_lock(&exp->exp_locks_list_guard);
1109         LASSERT(lock->l_exp_refs_nr > 0);
1110         if (lock->l_exp_refs_target != exp) {
1111                 LCONSOLE_WARN("lock %p, "
1112                               "mismatching export pointers: %p, %p\n",
1113                               lock, lock->l_exp_refs_target, exp);
1114         }
1115         if (-- lock->l_exp_refs_nr == 0) {
1116                 cfs_list_del_init(&lock->l_exp_refs_link);
1117                 lock->l_exp_refs_target = NULL;
1118         }
1119         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1120                lock, exp, lock->l_exp_refs_nr);
1121         spin_unlock(&exp->exp_locks_list_guard);
1122 }
1123 EXPORT_SYMBOL(__class_export_del_lock_ref);
1124 #endif
1125
1126 /* A connection defines an export context in which preallocation can
1127    be managed. This releases the export pointer reference, and returns
1128    the export handle, so the export refcount is 1 when this function
1129    returns. */
1130 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1131                   struct obd_uuid *cluuid)
1132 {
1133         struct obd_export *export;
1134         LASSERT(conn != NULL);
1135         LASSERT(obd != NULL);
1136         LASSERT(cluuid != NULL);
1137         ENTRY;
1138
1139         export = class_new_export(obd, cluuid);
1140         if (IS_ERR(export))
1141                 RETURN(PTR_ERR(export));
1142
1143         conn->cookie = export->exp_handle.h_cookie;
1144         class_export_put(export);
1145
1146         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1147                cluuid->uuid, conn->cookie);
1148         RETURN(0);
1149 }
1150 EXPORT_SYMBOL(class_connect);
1151
1152 /* if export is involved in recovery then clean up related things */
1153 void class_export_recovery_cleanup(struct obd_export *exp)
1154 {
1155         struct obd_device *obd = exp->exp_obd;
1156
1157         spin_lock(&obd->obd_recovery_task_lock);
1158         if (exp->exp_delayed)
1159                 obd->obd_delayed_clients--;
1160         if (obd->obd_recovering) {
1161                 if (exp->exp_in_recovery) {
1162                         spin_lock(&exp->exp_lock);
1163                         exp->exp_in_recovery = 0;
1164                         spin_unlock(&exp->exp_lock);
1165                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1166                         cfs_atomic_dec(&obd->obd_connected_clients);
1167                 }
1168
1169                 /* if called during recovery then should update
1170                  * obd_stale_clients counter,
1171                  * lightweight exports are not counted */
1172                 if (exp->exp_failed &&
1173                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1174                         exp->exp_obd->obd_stale_clients++;
1175         }
1176         spin_unlock(&obd->obd_recovery_task_lock);
1177         /** Cleanup req replay fields */
1178         if (exp->exp_req_replay_needed) {
1179                 spin_lock(&exp->exp_lock);
1180                 exp->exp_req_replay_needed = 0;
1181                 spin_unlock(&exp->exp_lock);
1182                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1183                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1184         }
1185         /** Cleanup lock replay data */
1186         if (exp->exp_lock_replay_needed) {
1187                 spin_lock(&exp->exp_lock);
1188                 exp->exp_lock_replay_needed = 0;
1189                 spin_unlock(&exp->exp_lock);
1190                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1191                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1192         }
1193 }
1194
1195 /* This function removes 1-3 references from the export:
1196  * 1 - for export pointer passed
1197  * and if disconnect really need
1198  * 2 - removing from hash
1199  * 3 - in client_unlink_export
1200  * The export pointer passed to this function can destroyed */
1201 int class_disconnect(struct obd_export *export)
1202 {
1203         int already_disconnected;
1204         ENTRY;
1205
1206         if (export == NULL) {
1207                 CWARN("attempting to free NULL export %p\n", export);
1208                 RETURN(-EINVAL);
1209         }
1210
1211         spin_lock(&export->exp_lock);
1212         already_disconnected = export->exp_disconnected;
1213         export->exp_disconnected = 1;
1214         spin_unlock(&export->exp_lock);
1215
1216         /* class_cleanup(), abort_recovery(), and class_fail_export()
1217          * all end up in here, and if any of them race we shouldn't
1218          * call extra class_export_puts(). */
1219         if (already_disconnected) {
1220                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1221                 GOTO(no_disconn, already_disconnected);
1222         }
1223
1224         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1225                export->exp_handle.h_cookie);
1226
1227         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1228                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1229                              &export->exp_connection->c_peer.nid,
1230                              &export->exp_nid_hash);
1231
1232         class_export_recovery_cleanup(export);
1233         class_unlink_export(export);
1234 no_disconn:
1235         class_export_put(export);
1236         RETURN(0);
1237 }
1238 EXPORT_SYMBOL(class_disconnect);
1239
1240 /* Return non-zero for a fully connected export */
1241 int class_connected_export(struct obd_export *exp)
1242 {
1243         if (exp) {
1244                 int connected;
1245                 spin_lock(&exp->exp_lock);
1246                 connected = (exp->exp_conn_cnt > 0);
1247                 spin_unlock(&exp->exp_lock);
1248                 return connected;
1249         }
1250         return 0;
1251 }
1252 EXPORT_SYMBOL(class_connected_export);
1253
1254 static void class_disconnect_export_list(cfs_list_t *list,
1255                                          enum obd_option flags)
1256 {
1257         int rc;
1258         struct obd_export *exp;
1259         ENTRY;
1260
1261         /* It's possible that an export may disconnect itself, but
1262          * nothing else will be added to this list. */
1263         while (!cfs_list_empty(list)) {
1264                 exp = cfs_list_entry(list->next, struct obd_export,
1265                                      exp_obd_chain);
1266                 /* need for safe call CDEBUG after obd_disconnect */
1267                 class_export_get(exp);
1268
1269                 spin_lock(&exp->exp_lock);
1270                 exp->exp_flags = flags;
1271                 spin_unlock(&exp->exp_lock);
1272
1273                 if (obd_uuid_equals(&exp->exp_client_uuid,
1274                                     &exp->exp_obd->obd_uuid)) {
1275                         CDEBUG(D_HA,
1276                                "exp %p export uuid == obd uuid, don't discon\n",
1277                                exp);
1278                         /* Need to delete this now so we don't end up pointing
1279                          * to work_list later when this export is cleaned up. */
1280                         cfs_list_del_init(&exp->exp_obd_chain);
1281                         class_export_put(exp);
1282                         continue;
1283                 }
1284
1285                 class_export_get(exp);
1286                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1287                        "last request at "CFS_TIME_T"\n",
1288                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1289                        exp, exp->exp_last_request_time);
1290                 /* release one export reference anyway */
1291                 rc = obd_disconnect(exp);
1292
1293                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1294                        obd_export_nid2str(exp), exp, rc);
1295                 class_export_put(exp);
1296         }
1297         EXIT;
1298 }
1299
1300 void class_disconnect_exports(struct obd_device *obd)
1301 {
1302         cfs_list_t work_list;
1303         ENTRY;
1304
1305         /* Move all of the exports from obd_exports to a work list, en masse. */
1306         CFS_INIT_LIST_HEAD(&work_list);
1307         spin_lock(&obd->obd_dev_lock);
1308         cfs_list_splice_init(&obd->obd_exports, &work_list);
1309         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1310         spin_unlock(&obd->obd_dev_lock);
1311
1312         if (!cfs_list_empty(&work_list)) {
1313                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1314                        "disconnecting them\n", obd->obd_minor, obd);
1315                 class_disconnect_export_list(&work_list,
1316                                              exp_flags_from_obd(obd));
1317         } else
1318                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1319                        obd->obd_minor, obd);
1320         EXIT;
1321 }
1322 EXPORT_SYMBOL(class_disconnect_exports);
1323
1324 /* Remove exports that have not completed recovery.
1325  */
1326 void class_disconnect_stale_exports(struct obd_device *obd,
1327                                     int (*test_export)(struct obd_export *))
1328 {
1329         cfs_list_t work_list;
1330         struct obd_export *exp, *n;
1331         int evicted = 0;
1332         ENTRY;
1333
1334         CFS_INIT_LIST_HEAD(&work_list);
1335         spin_lock(&obd->obd_dev_lock);
1336         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1337                                      exp_obd_chain) {
1338                 /* don't count self-export as client */
1339                 if (obd_uuid_equals(&exp->exp_client_uuid,
1340                                     &exp->exp_obd->obd_uuid))
1341                         continue;
1342
1343                 /* don't evict clients which have no slot in last_rcvd
1344                  * (e.g. lightweight connection) */
1345                 if (exp->exp_target_data.ted_lr_idx == -1)
1346                         continue;
1347
1348                 spin_lock(&exp->exp_lock);
1349                 if (exp->exp_failed || test_export(exp)) {
1350                         spin_unlock(&exp->exp_lock);
1351                         continue;
1352                 }
1353                 exp->exp_failed = 1;
1354                 spin_unlock(&exp->exp_lock);
1355
1356                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1357                 evicted++;
1358                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1359                        obd->obd_name, exp->exp_client_uuid.uuid,
1360                        exp->exp_connection == NULL ? "<unknown>" :
1361                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1362                 print_export_data(exp, "EVICTING", 0);
1363         }
1364         spin_unlock(&obd->obd_dev_lock);
1365
1366         if (evicted)
1367                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1368                               obd->obd_name, evicted);
1369
1370         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1371                                                  OBD_OPT_ABORT_RECOV);
1372         EXIT;
1373 }
1374 EXPORT_SYMBOL(class_disconnect_stale_exports);
1375
1376 void class_fail_export(struct obd_export *exp)
1377 {
1378         int rc, already_failed;
1379
1380         spin_lock(&exp->exp_lock);
1381         already_failed = exp->exp_failed;
1382         exp->exp_failed = 1;
1383         spin_unlock(&exp->exp_lock);
1384
1385         if (already_failed) {
1386                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1387                        exp, exp->exp_client_uuid.uuid);
1388                 return;
1389         }
1390
1391         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1392                exp, exp->exp_client_uuid.uuid);
1393
1394         if (obd_dump_on_timeout)
1395                 libcfs_debug_dumplog();
1396
1397         /* need for safe call CDEBUG after obd_disconnect */
1398         class_export_get(exp);
1399
1400         /* Most callers into obd_disconnect are removing their own reference
1401          * (request, for example) in addition to the one from the hash table.
1402          * We don't have such a reference here, so make one. */
1403         class_export_get(exp);
1404         rc = obd_disconnect(exp);
1405         if (rc)
1406                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1407         else
1408                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1409                        exp, exp->exp_client_uuid.uuid);
1410         class_export_put(exp);
1411 }
1412 EXPORT_SYMBOL(class_fail_export);
1413
1414 char *obd_export_nid2str(struct obd_export *exp)
1415 {
1416         if (exp->exp_connection != NULL)
1417                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1418
1419         return "(no nid)";
1420 }
1421 EXPORT_SYMBOL(obd_export_nid2str);
1422
1423 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1424 {
1425         cfs_hash_t *nid_hash;
1426         struct obd_export *doomed_exp = NULL;
1427         int exports_evicted = 0;
1428
1429         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1430
1431         spin_lock(&obd->obd_dev_lock);
1432         /* umount has run already, so evict thread should leave
1433          * its task to umount thread now */
1434         if (obd->obd_stopping) {
1435                 spin_unlock(&obd->obd_dev_lock);
1436                 return exports_evicted;
1437         }
1438         nid_hash = obd->obd_nid_hash;
1439         cfs_hash_getref(nid_hash);
1440         spin_unlock(&obd->obd_dev_lock);
1441
1442         do {
1443                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1444                 if (doomed_exp == NULL)
1445                         break;
1446
1447                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1448                          "nid %s found, wanted nid %s, requested nid %s\n",
1449                          obd_export_nid2str(doomed_exp),
1450                          libcfs_nid2str(nid_key), nid);
1451                 LASSERTF(doomed_exp != obd->obd_self_export,
1452                          "self-export is hashed by NID?\n");
1453                 exports_evicted++;
1454                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1455                               "request\n", obd->obd_name,
1456                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1457                               obd_export_nid2str(doomed_exp));
1458                 class_fail_export(doomed_exp);
1459                 class_export_put(doomed_exp);
1460         } while (1);
1461
1462         cfs_hash_putref(nid_hash);
1463
1464         if (!exports_evicted)
1465                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1466                        obd->obd_name, nid);
1467         return exports_evicted;
1468 }
1469 EXPORT_SYMBOL(obd_export_evict_by_nid);
1470
1471 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1472 {
1473         cfs_hash_t *uuid_hash;
1474         struct obd_export *doomed_exp = NULL;
1475         struct obd_uuid doomed_uuid;
1476         int exports_evicted = 0;
1477
1478         spin_lock(&obd->obd_dev_lock);
1479         if (obd->obd_stopping) {
1480                 spin_unlock(&obd->obd_dev_lock);
1481                 return exports_evicted;
1482         }
1483         uuid_hash = obd->obd_uuid_hash;
1484         cfs_hash_getref(uuid_hash);
1485         spin_unlock(&obd->obd_dev_lock);
1486
1487         obd_str2uuid(&doomed_uuid, uuid);
1488         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1489                 CERROR("%s: can't evict myself\n", obd->obd_name);
1490                 cfs_hash_putref(uuid_hash);
1491                 return exports_evicted;
1492         }
1493
1494         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1495
1496         if (doomed_exp == NULL) {
1497                 CERROR("%s: can't disconnect %s: no exports found\n",
1498                        obd->obd_name, uuid);
1499         } else {
1500                 CWARN("%s: evicting %s at adminstrative request\n",
1501                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1502                 class_fail_export(doomed_exp);
1503                 class_export_put(doomed_exp);
1504                 exports_evicted++;
1505         }
1506         cfs_hash_putref(uuid_hash);
1507
1508         return exports_evicted;
1509 }
1510 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1511
1512 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1513 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1514 EXPORT_SYMBOL(class_export_dump_hook);
1515 #endif
1516
1517 static void print_export_data(struct obd_export *exp, const char *status,
1518                               int locks)
1519 {
1520         struct ptlrpc_reply_state *rs;
1521         struct ptlrpc_reply_state *first_reply = NULL;
1522         int nreplies = 0;
1523
1524         spin_lock(&exp->exp_lock);
1525         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1526                                 rs_exp_list) {
1527                 if (nreplies == 0)
1528                         first_reply = rs;
1529                 nreplies++;
1530         }
1531         spin_unlock(&exp->exp_lock);
1532
1533         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1534                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1535                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1536                cfs_atomic_read(&exp->exp_rpc_count),
1537                cfs_atomic_read(&exp->exp_cb_count),
1538                cfs_atomic_read(&exp->exp_locks_count),
1539                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1540                nreplies, first_reply, nreplies > 3 ? "..." : "",
1541                exp->exp_last_committed);
1542 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1543         if (locks && class_export_dump_hook != NULL)
1544                 class_export_dump_hook(exp);
1545 #endif
1546 }
1547
1548 void dump_exports(struct obd_device *obd, int locks)
1549 {
1550         struct obd_export *exp;
1551
1552         spin_lock(&obd->obd_dev_lock);
1553         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1554                 print_export_data(exp, "ACTIVE", locks);
1555         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1556                 print_export_data(exp, "UNLINKED", locks);
1557         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1558                 print_export_data(exp, "DELAYED", locks);
1559         spin_unlock(&obd->obd_dev_lock);
1560         spin_lock(&obd_zombie_impexp_lock);
1561         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1562                 print_export_data(exp, "ZOMBIE", locks);
1563         spin_unlock(&obd_zombie_impexp_lock);
1564 }
1565 EXPORT_SYMBOL(dump_exports);
1566
1567 void obd_exports_barrier(struct obd_device *obd)
1568 {
1569         int waited = 2;
1570         LASSERT(cfs_list_empty(&obd->obd_exports));
1571         spin_lock(&obd->obd_dev_lock);
1572         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1573                 spin_unlock(&obd->obd_dev_lock);
1574                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1575                                                    cfs_time_seconds(waited));
1576                 if (waited > 5 && IS_PO2(waited)) {
1577                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1578                                       "more than %d seconds. "
1579                                       "The obd refcount = %d. Is it stuck?\n",
1580                                       obd->obd_name, waited,
1581                                       cfs_atomic_read(&obd->obd_refcount));
1582                         dump_exports(obd, 1);
1583                 }
1584                 waited *= 2;
1585                 spin_lock(&obd->obd_dev_lock);
1586         }
1587         spin_unlock(&obd->obd_dev_lock);
1588 }
1589 EXPORT_SYMBOL(obd_exports_barrier);
1590
1591 /* Total amount of zombies to be destroyed */
1592 static int zombies_count = 0;
1593
1594 /**
1595  * kill zombie imports and exports
1596  */
1597 void obd_zombie_impexp_cull(void)
1598 {
1599         struct obd_import *import;
1600         struct obd_export *export;
1601         ENTRY;
1602
1603         do {
1604                 spin_lock(&obd_zombie_impexp_lock);
1605
1606                 import = NULL;
1607                 if (!cfs_list_empty(&obd_zombie_imports)) {
1608                         import = cfs_list_entry(obd_zombie_imports.next,
1609                                                 struct obd_import,
1610                                                 imp_zombie_chain);
1611                         cfs_list_del_init(&import->imp_zombie_chain);
1612                 }
1613
1614                 export = NULL;
1615                 if (!cfs_list_empty(&obd_zombie_exports)) {
1616                         export = cfs_list_entry(obd_zombie_exports.next,
1617                                                 struct obd_export,
1618                                                 exp_obd_chain);
1619                         cfs_list_del_init(&export->exp_obd_chain);
1620                 }
1621
1622                 spin_unlock(&obd_zombie_impexp_lock);
1623
1624                 if (import != NULL) {
1625                         class_import_destroy(import);
1626                         spin_lock(&obd_zombie_impexp_lock);
1627                         zombies_count--;
1628                         spin_unlock(&obd_zombie_impexp_lock);
1629                 }
1630
1631                 if (export != NULL) {
1632                         class_export_destroy(export);
1633                         spin_lock(&obd_zombie_impexp_lock);
1634                         zombies_count--;
1635                         spin_unlock(&obd_zombie_impexp_lock);
1636                 }
1637
1638                 cfs_cond_resched();
1639         } while (import != NULL || export != NULL);
1640         EXIT;
1641 }
1642
1643 static struct completion        obd_zombie_start;
1644 static struct completion        obd_zombie_stop;
1645 static unsigned long            obd_zombie_flags;
1646 static cfs_waitq_t              obd_zombie_waitq;
1647 static pid_t                    obd_zombie_pid;
1648
1649 enum {
1650         OBD_ZOMBIE_STOP         = 0x0001,
1651 };
1652
1653 /**
1654  * check for work for kill zombie import/export thread.
1655  */
1656 static int obd_zombie_impexp_check(void *arg)
1657 {
1658         int rc;
1659
1660         spin_lock(&obd_zombie_impexp_lock);
1661         rc = (zombies_count == 0) &&
1662              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1663         spin_unlock(&obd_zombie_impexp_lock);
1664
1665         RETURN(rc);
1666 }
1667
1668 /**
1669  * Add export to the obd_zombe thread and notify it.
1670  */
1671 static void obd_zombie_export_add(struct obd_export *exp) {
1672         spin_lock(&exp->exp_obd->obd_dev_lock);
1673         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1674         cfs_list_del_init(&exp->exp_obd_chain);
1675         spin_unlock(&exp->exp_obd->obd_dev_lock);
1676         spin_lock(&obd_zombie_impexp_lock);
1677         zombies_count++;
1678         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1679         spin_unlock(&obd_zombie_impexp_lock);
1680
1681         obd_zombie_impexp_notify();
1682 }
1683
1684 /**
1685  * Add import to the obd_zombe thread and notify it.
1686  */
1687 static void obd_zombie_import_add(struct obd_import *imp) {
1688         LASSERT(imp->imp_sec == NULL);
1689         LASSERT(imp->imp_rq_pool == NULL);
1690         spin_lock(&obd_zombie_impexp_lock);
1691         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1692         zombies_count++;
1693         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1694         spin_unlock(&obd_zombie_impexp_lock);
1695
1696         obd_zombie_impexp_notify();
1697 }
1698
1699 /**
1700  * notify import/export destroy thread about new zombie.
1701  */
1702 static void obd_zombie_impexp_notify(void)
1703 {
1704         /*
1705          * Make sure obd_zomebie_impexp_thread get this notification.
1706          * It is possible this signal only get by obd_zombie_barrier, and
1707          * barrier gulps this notification and sleeps away and hangs ensues
1708          */
1709         cfs_waitq_broadcast(&obd_zombie_waitq);
1710 }
1711
1712 /**
1713  * check whether obd_zombie is idle
1714  */
1715 static int obd_zombie_is_idle(void)
1716 {
1717         int rc;
1718
1719         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1720         spin_lock(&obd_zombie_impexp_lock);
1721         rc = (zombies_count == 0);
1722         spin_unlock(&obd_zombie_impexp_lock);
1723         return rc;
1724 }
1725
1726 /**
1727  * wait when obd_zombie import/export queues become empty
1728  */
1729 void obd_zombie_barrier(void)
1730 {
1731         struct l_wait_info lwi = { 0 };
1732
1733         if (obd_zombie_pid == cfs_curproc_pid())
1734                 /* don't wait for myself */
1735                 return;
1736         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1737 }
1738 EXPORT_SYMBOL(obd_zombie_barrier);
1739
1740 #ifdef __KERNEL__
1741
1742 /**
1743  * destroy zombie export/import thread.
1744  */
1745 static int obd_zombie_impexp_thread(void *unused)
1746 {
1747         int rc;
1748
1749         rc = cfs_daemonize_ctxt("obd_zombid");
1750         if (rc != 0) {
1751                 complete(&obd_zombie_start);
1752                 RETURN(rc);
1753         }
1754
1755         complete(&obd_zombie_start);
1756
1757         obd_zombie_pid = cfs_curproc_pid();
1758
1759         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1760                 struct l_wait_info lwi = { 0 };
1761
1762                 l_wait_event(obd_zombie_waitq,
1763                              !obd_zombie_impexp_check(NULL), &lwi);
1764                 obd_zombie_impexp_cull();
1765
1766                 /*
1767                  * Notify obd_zombie_barrier callers that queues
1768                  * may be empty.
1769                  */
1770                 cfs_waitq_signal(&obd_zombie_waitq);
1771         }
1772
1773         complete(&obd_zombie_stop);
1774
1775         RETURN(0);
1776 }
1777
1778 #else /* ! KERNEL */
1779
1780 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1781 static void *obd_zombie_impexp_work_cb;
1782 static void *obd_zombie_impexp_idle_cb;
1783
1784 int obd_zombie_impexp_kill(void *arg)
1785 {
1786         int rc = 0;
1787
1788         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1789                 obd_zombie_impexp_cull();
1790                 rc = 1;
1791         }
1792         cfs_atomic_dec(&zombie_recur);
1793         return rc;
1794 }
1795
1796 #endif
1797
1798 /**
1799  * start destroy zombie import/export thread
1800  */
1801 int obd_zombie_impexp_init(void)
1802 {
1803         int rc;
1804
1805         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1806         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1807         spin_lock_init(&obd_zombie_impexp_lock);
1808         init_completion(&obd_zombie_start);
1809         init_completion(&obd_zombie_stop);
1810         cfs_waitq_init(&obd_zombie_waitq);
1811         obd_zombie_pid = 0;
1812
1813 #ifdef __KERNEL__
1814         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1815         if (rc < 0)
1816                 RETURN(rc);
1817
1818         wait_for_completion(&obd_zombie_start);
1819 #else
1820
1821         obd_zombie_impexp_work_cb =
1822                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1823                                                  &obd_zombie_impexp_kill, NULL);
1824
1825         obd_zombie_impexp_idle_cb =
1826                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1827                                                  &obd_zombie_impexp_check, NULL);
1828         rc = 0;
1829 #endif
1830         RETURN(rc);
1831 }
1832 /**
1833  * stop destroy zombie import/export thread
1834  */
1835 void obd_zombie_impexp_stop(void)
1836 {
1837         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1838         obd_zombie_impexp_notify();
1839 #ifdef __KERNEL__
1840         wait_for_completion(&obd_zombie_stop);
1841 #else
1842         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1843         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1844 #endif
1845 }
1846
1847 /***** Kernel-userspace comm helpers *******/
1848
1849 /* Get length of entire message, including header */
1850 int kuc_len(int payload_len)
1851 {
1852         return sizeof(struct kuc_hdr) + payload_len;
1853 }
1854 EXPORT_SYMBOL(kuc_len);
1855
1856 /* Get a pointer to kuc header, given a ptr to the payload
1857  * @param p Pointer to payload area
1858  * @returns Pointer to kuc header
1859  */
1860 struct kuc_hdr * kuc_ptr(void *p)
1861 {
1862         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1863         LASSERT(lh->kuc_magic == KUC_MAGIC);
1864         return lh;
1865 }
1866 EXPORT_SYMBOL(kuc_ptr);
1867
1868 /* Test if payload is part of kuc message
1869  * @param p Pointer to payload area
1870  * @returns boolean
1871  */
1872 int kuc_ispayload(void *p)
1873 {
1874         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1875
1876         if (kh->kuc_magic == KUC_MAGIC)
1877                 return 1;
1878         else
1879                 return 0;
1880 }
1881 EXPORT_SYMBOL(kuc_ispayload);
1882
1883 /* Alloc space for a message, and fill in header
1884  * @return Pointer to payload area
1885  */
1886 void *kuc_alloc(int payload_len, int transport, int type)
1887 {
1888         struct kuc_hdr *lh;
1889         int len = kuc_len(payload_len);
1890
1891         OBD_ALLOC(lh, len);
1892         if (lh == NULL)
1893                 return ERR_PTR(-ENOMEM);
1894
1895         lh->kuc_magic = KUC_MAGIC;
1896         lh->kuc_transport = transport;
1897         lh->kuc_msgtype = type;
1898         lh->kuc_msglen = len;
1899
1900         return (void *)(lh + 1);
1901 }
1902 EXPORT_SYMBOL(kuc_alloc);
1903
1904 /* Takes pointer to payload area */
1905 inline void kuc_free(void *p, int payload_len)
1906 {
1907         struct kuc_hdr *lh = kuc_ptr(p);
1908         OBD_FREE(lh, kuc_len(payload_len));
1909 }
1910 EXPORT_SYMBOL(kuc_free);
1911
1912
1913