Whamcloud - gitweb
LU-1431 ptlrpc: PTLRPC_BRW_MAX_SIZE usage cleanup.
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!cfs_request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 cfs_try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151 EXPORT_SYMBOL(class_get_type);
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         cfs_module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161 EXPORT_SYMBOL(class_put_type);
162
163 #define CLASS_MAX_NAME 1024
164
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166                         struct lprocfs_vars *vars, const char *name,
167                         struct lu_device_type *ldt)
168 {
169         struct obd_type *type;
170         int rc = 0;
171         ENTRY;
172
173         /* sanity check */
174         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
175
176         if (class_search_type(name)) {
177                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
178                 RETURN(-EEXIST);
179         }
180
181         rc = -ENOMEM;
182         OBD_ALLOC(type, sizeof(*type));
183         if (type == NULL)
184                 RETURN(rc);
185
186         OBD_ALLOC_PTR(type->typ_dt_ops);
187         OBD_ALLOC_PTR(type->typ_md_ops);
188         OBD_ALLOC(type->typ_name, strlen(name) + 1);
189
190         if (type->typ_dt_ops == NULL ||
191             type->typ_md_ops == NULL ||
192             type->typ_name == NULL)
193                 GOTO (failed, rc);
194
195         *(type->typ_dt_ops) = *dt_ops;
196         /* md_ops is optional */
197         if (md_ops)
198                 *(type->typ_md_ops) = *md_ops;
199         strcpy(type->typ_name, name);
200         spin_lock_init(&type->obd_type_lock);
201
202 #ifdef LPROCFS
203         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
204                                               vars, type);
205         if (IS_ERR(type->typ_procroot)) {
206                 rc = PTR_ERR(type->typ_procroot);
207                 type->typ_procroot = NULL;
208                 GOTO (failed, rc);
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         cfs_list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224  failed:
225         if (type->typ_name != NULL)
226                 OBD_FREE(type->typ_name, strlen(name) + 1);
227         if (type->typ_md_ops != NULL)
228                 OBD_FREE_PTR(type->typ_md_ops);
229         if (type->typ_dt_ops != NULL)
230                 OBD_FREE_PTR(type->typ_dt_ops);
231         OBD_FREE(type, sizeof(*type));
232         RETURN(rc);
233 }
234 EXPORT_SYMBOL(class_register_type);
235
236 int class_unregister_type(const char *name)
237 {
238         struct obd_type *type = class_search_type(name);
239         ENTRY;
240
241         if (!type) {
242                 CERROR("unknown obd type\n");
243                 RETURN(-EINVAL);
244         }
245
246         if (type->typ_refcnt) {
247                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248                 /* This is a bad situation, let's make the best of it */
249                 /* Remove ops, but leave the name for debugging */
250                 OBD_FREE_PTR(type->typ_dt_ops);
251                 OBD_FREE_PTR(type->typ_md_ops);
252                 RETURN(-EBUSY);
253         }
254
255         /* we do not use type->typ_procroot as for compatibility purposes
256          * other modules can share names (i.e. lod can use lov entry). so
257          * we can't reference pointer as it can get invalided when another
258          * module removes the entry */
259         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
260
261         if (type->typ_lu)
262                 lu_device_type_fini(type->typ_lu);
263
264         spin_lock(&obd_types_lock);
265         cfs_list_del(&type->typ_chain);
266         spin_unlock(&obd_types_lock);
267         OBD_FREE(type->typ_name, strlen(name) + 1);
268         if (type->typ_dt_ops != NULL)
269                 OBD_FREE_PTR(type->typ_dt_ops);
270         if (type->typ_md_ops != NULL)
271                 OBD_FREE_PTR(type->typ_md_ops);
272         OBD_FREE(type, sizeof(*type));
273         RETURN(0);
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
276
277 /**
278  * Create a new obd device.
279  *
280  * Find an empty slot in ::obd_devs[], create a new obd device in it.
281  *
282  * \param[in] type_name obd device type string.
283  * \param[in] name      obd device name.
284  *
285  * \retval NULL if create fails, otherwise return the obd device
286  *         pointer created.
287  */
288 struct obd_device *class_newdev(const char *type_name, const char *name)
289 {
290         struct obd_device *result = NULL;
291         struct obd_device *newdev;
292         struct obd_type *type = NULL;
293         int i;
294         int new_obd_minor = 0;
295         ENTRY;
296
297         if (strlen(name) >= MAX_OBD_NAME) {
298                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299                 RETURN(ERR_PTR(-EINVAL));
300         }
301
302         type = class_get_type(type_name);
303         if (type == NULL){
304                 CERROR("OBD: unknown type: %s\n", type_name);
305                 RETURN(ERR_PTR(-ENODEV));
306         }
307
308         newdev = obd_device_alloc();
309         if (newdev == NULL)
310                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
311
312         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
313
314         write_lock(&obd_dev_lock);
315         for (i = 0; i < class_devno_max(); i++) {
316                 struct obd_device *obd = class_num2obd(i);
317
318                 if (obd && obd->obd_name &&
319                     (strcmp(name, obd->obd_name) == 0)) {
320                         CERROR("Device %s already exists at %d, won't add\n",
321                                name, i);
322                         if (result) {
323                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
324                                          "%p obd_magic %08x != %08x\n", result,
325                                          result->obd_magic, OBD_DEVICE_MAGIC);
326                                 LASSERTF(result->obd_minor == new_obd_minor,
327                                          "%p obd_minor %d != %d\n", result,
328                                          result->obd_minor, new_obd_minor);
329
330                                 obd_devs[result->obd_minor] = NULL;
331                                 result->obd_name[0]='\0';
332                          }
333                         result = ERR_PTR(-EEXIST);
334                         break;
335                 }
336                 if (!result && !obd) {
337                         result = newdev;
338                         result->obd_minor = i;
339                         new_obd_minor = i;
340                         result->obd_type = type;
341                         strncpy(result->obd_name, name,
342                                 sizeof(result->obd_name) - 1);
343                         obd_devs[i] = result;
344                 }
345         }
346         write_unlock(&obd_dev_lock);
347
348         if (result == NULL && i >= class_devno_max()) {
349                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
350                        class_devno_max());
351                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
352         }
353
354         if (IS_ERR(result))
355                 GOTO(out, result);
356
357         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
358                result->obd_name, result);
359
360         RETURN(result);
361 out:
362         obd_device_free(newdev);
363 out_type:
364         class_put_type(type);
365         return result;
366 }
367
368 void class_release_dev(struct obd_device *obd)
369 {
370         struct obd_type *obd_type = obd->obd_type;
371
372         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
373                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
374         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
375                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
376         LASSERT(obd_type != NULL);
377
378         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
379                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
380
381         write_lock(&obd_dev_lock);
382         obd_devs[obd->obd_minor] = NULL;
383         write_unlock(&obd_dev_lock);
384         obd_device_free(obd);
385
386         class_put_type(obd_type);
387 }
388
389 int class_name2dev(const char *name)
390 {
391         int i;
392
393         if (!name)
394                 return -1;
395
396         read_lock(&obd_dev_lock);
397         for (i = 0; i < class_devno_max(); i++) {
398                 struct obd_device *obd = class_num2obd(i);
399
400                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
401                         /* Make sure we finished attaching before we give
402                            out any references */
403                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
404                         if (obd->obd_attached) {
405                                 read_unlock(&obd_dev_lock);
406                                 return i;
407                         }
408                         break;
409                 }
410         }
411         read_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415 EXPORT_SYMBOL(class_name2dev);
416
417 struct obd_device *class_name2obd(const char *name)
418 {
419         int dev = class_name2dev(name);
420
421         if (dev < 0 || dev > class_devno_max())
422                 return NULL;
423         return class_num2obd(dev);
424 }
425 EXPORT_SYMBOL(class_name2obd);
426
427 int class_uuid2dev(struct obd_uuid *uuid)
428 {
429         int i;
430
431         read_lock(&obd_dev_lock);
432         for (i = 0; i < class_devno_max(); i++) {
433                 struct obd_device *obd = class_num2obd(i);
434
435                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
436                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
437                         read_unlock(&obd_dev_lock);
438                         return i;
439                 }
440         }
441         read_unlock(&obd_dev_lock);
442
443         return -1;
444 }
445 EXPORT_SYMBOL(class_uuid2dev);
446
447 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
448 {
449         int dev = class_uuid2dev(uuid);
450         if (dev < 0)
451                 return NULL;
452         return class_num2obd(dev);
453 }
454 EXPORT_SYMBOL(class_uuid2obd);
455
456 /**
457  * Get obd device from ::obd_devs[]
458  *
459  * \param num [in] array index
460  *
461  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
462  *         otherwise return the obd device there.
463  */
464 struct obd_device *class_num2obd(int num)
465 {
466         struct obd_device *obd = NULL;
467
468         if (num < class_devno_max()) {
469                 obd = obd_devs[num];
470                 if (obd == NULL)
471                         return NULL;
472
473                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
474                          "%p obd_magic %08x != %08x\n",
475                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
476                 LASSERTF(obd->obd_minor == num,
477                          "%p obd_minor %0d != %0d\n",
478                          obd, obd->obd_minor, num);
479         }
480
481         return obd;
482 }
483 EXPORT_SYMBOL(class_num2obd);
484
485 /**
486  * Get obd devices count. Device in any
487  *    state are counted
488  * \retval obd device count
489  */
490 int get_devices_count(void)
491 {
492         int index, max_index = class_devno_max(), dev_count = 0;
493
494         read_lock(&obd_dev_lock);
495         for (index = 0; index <= max_index; index++) {
496                 struct obd_device *obd = class_num2obd(index);
497                 if (obd != NULL)
498                         dev_count++;
499         }
500         read_unlock(&obd_dev_lock);
501
502         return dev_count;
503 }
504 EXPORT_SYMBOL(get_devices_count);
505
506 void class_obd_list(void)
507 {
508         char *status;
509         int i;
510
511         read_lock(&obd_dev_lock);
512         for (i = 0; i < class_devno_max(); i++) {
513                 struct obd_device *obd = class_num2obd(i);
514
515                 if (obd == NULL)
516                         continue;
517                 if (obd->obd_stopping)
518                         status = "ST";
519                 else if (obd->obd_set_up)
520                         status = "UP";
521                 else if (obd->obd_attached)
522                         status = "AT";
523                 else
524                         status = "--";
525                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
526                          i, status, obd->obd_type->typ_name,
527                          obd->obd_name, obd->obd_uuid.uuid,
528                          cfs_atomic_read(&obd->obd_refcount));
529         }
530         read_unlock(&obd_dev_lock);
531         return;
532 }
533
534 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
535    specified, then only the client with that uuid is returned,
536    otherwise any client connected to the tgt is returned. */
537 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
538                                           const char * typ_name,
539                                           struct obd_uuid *grp_uuid)
540 {
541         int i;
542
543         read_lock(&obd_dev_lock);
544         for (i = 0; i < class_devno_max(); i++) {
545                 struct obd_device *obd = class_num2obd(i);
546
547                 if (obd == NULL)
548                         continue;
549                 if ((strncmp(obd->obd_type->typ_name, typ_name,
550                              strlen(typ_name)) == 0)) {
551                         if (obd_uuid_equals(tgt_uuid,
552                                             &obd->u.cli.cl_target_uuid) &&
553                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
554                                                          &obd->obd_uuid) : 1)) {
555                                 read_unlock(&obd_dev_lock);
556                                 return obd;
557                         }
558                 }
559         }
560         read_unlock(&obd_dev_lock);
561
562         return NULL;
563 }
564 EXPORT_SYMBOL(class_find_client_obd);
565
566 /* Iterate the obd_device list looking devices have grp_uuid. Start
567    searching at *next, and if a device is found, the next index to look
568    at is saved in *next. If next is NULL, then the first matching device
569    will always be returned. */
570 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
571 {
572         int i;
573
574         if (next == NULL)
575                 i = 0;
576         else if (*next >= 0 && *next < class_devno_max())
577                 i = *next;
578         else
579                 return NULL;
580
581         read_lock(&obd_dev_lock);
582         for (; i < class_devno_max(); i++) {
583                 struct obd_device *obd = class_num2obd(i);
584
585                 if (obd == NULL)
586                         continue;
587                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
588                         if (next != NULL)
589                                 *next = i+1;
590                         read_unlock(&obd_dev_lock);
591                         return obd;
592                 }
593         }
594         read_unlock(&obd_dev_lock);
595
596         return NULL;
597 }
598 EXPORT_SYMBOL(class_devices_in_group);
599
600 /**
601  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
602  * adjust sptlrpc settings accordingly.
603  */
604 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
605 {
606         struct obd_device  *obd;
607         const char         *type;
608         int                 i, rc = 0, rc2;
609
610         LASSERT(namelen > 0);
611
612         read_lock(&obd_dev_lock);
613         for (i = 0; i < class_devno_max(); i++) {
614                 obd = class_num2obd(i);
615
616                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
617                         continue;
618
619                 /* only notify mdc, osc, mdt, ost */
620                 type = obd->obd_type->typ_name;
621                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
622                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
623                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
624                     strcmp(type, LUSTRE_OST_NAME) != 0)
625                         continue;
626
627                 if (strncmp(obd->obd_name, fsname, namelen))
628                         continue;
629
630                 class_incref(obd, __FUNCTION__, obd);
631                 read_unlock(&obd_dev_lock);
632                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
633                                          sizeof(KEY_SPTLRPC_CONF),
634                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
635                 rc = rc ? rc : rc2;
636                 class_decref(obd, __FUNCTION__, obd);
637                 read_lock(&obd_dev_lock);
638         }
639         read_unlock(&obd_dev_lock);
640         return rc;
641 }
642 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
643
644 void obd_cleanup_caches(void)
645 {
646         int rc;
647
648         ENTRY;
649         if (obd_device_cachep) {
650                 rc = cfs_mem_cache_destroy(obd_device_cachep);
651                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
652                 obd_device_cachep = NULL;
653         }
654         if (obdo_cachep) {
655                 rc = cfs_mem_cache_destroy(obdo_cachep);
656                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
657                 obdo_cachep = NULL;
658         }
659         if (import_cachep) {
660                 rc = cfs_mem_cache_destroy(import_cachep);
661                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
662                 import_cachep = NULL;
663         }
664         if (capa_cachep) {
665                 rc = cfs_mem_cache_destroy(capa_cachep);
666                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
667                 capa_cachep = NULL;
668         }
669         EXIT;
670 }
671
672 int obd_init_caches(void)
673 {
674         ENTRY;
675
676         LASSERT(obd_device_cachep == NULL);
677         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
678                                                  sizeof(struct obd_device),
679                                                  0, 0);
680         if (!obd_device_cachep)
681                 GOTO(out, -ENOMEM);
682
683         LASSERT(obdo_cachep == NULL);
684         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
685                                            0, 0);
686         if (!obdo_cachep)
687                 GOTO(out, -ENOMEM);
688
689         LASSERT(import_cachep == NULL);
690         import_cachep = cfs_mem_cache_create("ll_import_cache",
691                                              sizeof(struct obd_import),
692                                              0, 0);
693         if (!import_cachep)
694                 GOTO(out, -ENOMEM);
695
696         LASSERT(capa_cachep == NULL);
697         capa_cachep = cfs_mem_cache_create("capa_cache",
698                                            sizeof(struct obd_capa), 0, 0);
699         if (!capa_cachep)
700                 GOTO(out, -ENOMEM);
701
702         RETURN(0);
703  out:
704         obd_cleanup_caches();
705         RETURN(-ENOMEM);
706
707 }
708
709 /* map connection to client */
710 struct obd_export *class_conn2export(struct lustre_handle *conn)
711 {
712         struct obd_export *export;
713         ENTRY;
714
715         if (!conn) {
716                 CDEBUG(D_CACHE, "looking for null handle\n");
717                 RETURN(NULL);
718         }
719
720         if (conn->cookie == -1) {  /* this means assign a new connection */
721                 CDEBUG(D_CACHE, "want a new connection\n");
722                 RETURN(NULL);
723         }
724
725         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
726         export = class_handle2object(conn->cookie);
727         RETURN(export);
728 }
729 EXPORT_SYMBOL(class_conn2export);
730
731 struct obd_device *class_exp2obd(struct obd_export *exp)
732 {
733         if (exp)
734                 return exp->exp_obd;
735         return NULL;
736 }
737 EXPORT_SYMBOL(class_exp2obd);
738
739 struct obd_device *class_conn2obd(struct lustre_handle *conn)
740 {
741         struct obd_export *export;
742         export = class_conn2export(conn);
743         if (export) {
744                 struct obd_device *obd = export->exp_obd;
745                 class_export_put(export);
746                 return obd;
747         }
748         return NULL;
749 }
750 EXPORT_SYMBOL(class_conn2obd);
751
752 struct obd_import *class_exp2cliimp(struct obd_export *exp)
753 {
754         struct obd_device *obd = exp->exp_obd;
755         if (obd == NULL)
756                 return NULL;
757         return obd->u.cli.cl_import;
758 }
759 EXPORT_SYMBOL(class_exp2cliimp);
760
761 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
762 {
763         struct obd_device *obd = class_conn2obd(conn);
764         if (obd == NULL)
765                 return NULL;
766         return obd->u.cli.cl_import;
767 }
768 EXPORT_SYMBOL(class_conn2cliimp);
769
770 /* Export management functions */
771 static void class_export_destroy(struct obd_export *exp)
772 {
773         struct obd_device *obd = exp->exp_obd;
774         ENTRY;
775
776         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
777         LASSERT(obd != NULL);
778
779         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
780                exp->exp_client_uuid.uuid, obd->obd_name);
781
782         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783         if (exp->exp_connection)
784                 ptlrpc_put_connection_superhack(exp->exp_connection);
785
786         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
787         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
788         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
789         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
790         obd_destroy_export(exp);
791         class_decref(obd, "export", exp);
792
793         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
794         EXIT;
795 }
796
797 static void export_handle_addref(void *export)
798 {
799         class_export_get(export);
800 }
801
802 static struct portals_handle_ops export_handle_ops = {
803         .hop_addref = export_handle_addref,
804         .hop_free   = NULL,
805 };
806
807 struct obd_export *class_export_get(struct obd_export *exp)
808 {
809         cfs_atomic_inc(&exp->exp_refcount);
810         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
811                cfs_atomic_read(&exp->exp_refcount));
812         return exp;
813 }
814 EXPORT_SYMBOL(class_export_get);
815
816 void class_export_put(struct obd_export *exp)
817 {
818         LASSERT(exp != NULL);
819         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
820         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
821                cfs_atomic_read(&exp->exp_refcount) - 1);
822
823         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
824                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
825                 CDEBUG(D_IOCTL, "final put %p/%s\n",
826                        exp, exp->exp_client_uuid.uuid);
827
828                 /* release nid stat refererence */
829                 lprocfs_exp_cleanup(exp);
830
831                 obd_zombie_export_add(exp);
832         }
833 }
834 EXPORT_SYMBOL(class_export_put);
835
836 /* Creates a new export, adds it to the hash table, and returns a
837  * pointer to it. The refcount is 2: one for the hash reference, and
838  * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840                                     struct obd_uuid *cluuid)
841 {
842         struct obd_export *export;
843         cfs_hash_t *hash = NULL;
844         int rc = 0;
845         ENTRY;
846
847         OBD_ALLOC_PTR(export);
848         if (!export)
849                 return ERR_PTR(-ENOMEM);
850
851         export->exp_conn_cnt = 0;
852         export->exp_lock_hash = NULL;
853         export->exp_flock_hash = NULL;
854         cfs_atomic_set(&export->exp_refcount, 2);
855         cfs_atomic_set(&export->exp_rpc_count, 0);
856         cfs_atomic_set(&export->exp_cb_count, 0);
857         cfs_atomic_set(&export->exp_locks_count, 0);
858 #if LUSTRE_TRACKS_LOCK_EXP_REFS
859         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
860         spin_lock_init(&export->exp_locks_list_guard);
861 #endif
862         cfs_atomic_set(&export->exp_replay_count, 0);
863         export->exp_obd = obd;
864         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
865         spin_lock_init(&export->exp_uncommitted_replies_lock);
866         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
867         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
868         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
869         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
870         class_handle_hash(&export->exp_handle, &export_handle_ops);
871         export->exp_last_request_time = cfs_time_current_sec();
872         spin_lock_init(&export->exp_lock);
873         spin_lock_init(&export->exp_rpc_lock);
874         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
875         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
876         spin_lock_init(&export->exp_bl_list_lock);
877         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
878
879         export->exp_sp_peer = LUSTRE_SP_ANY;
880         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881         export->exp_client_uuid = *cluuid;
882         obd_init_export(export);
883
884         spin_lock(&obd->obd_dev_lock);
885         /* shouldn't happen, but might race */
886         if (obd->obd_stopping)
887                 GOTO(exit_unlock, rc = -ENODEV);
888
889         hash = cfs_hash_getref(obd->obd_uuid_hash);
890         if (hash == NULL)
891                 GOTO(exit_unlock, rc = -ENODEV);
892         spin_unlock(&obd->obd_dev_lock);
893
894         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896                 if (rc != 0) {
897                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898                                       obd->obd_name, cluuid->uuid, rc);
899                         GOTO(exit_err, rc = -EALREADY);
900                 }
901         }
902
903         spin_lock(&obd->obd_dev_lock);
904         if (obd->obd_stopping) {
905                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
906                 GOTO(exit_unlock, rc = -ENODEV);
907         }
908
909         class_incref(obd, "export", export);
910         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
911         cfs_list_add_tail(&export->exp_obd_chain_timed,
912                           &export->exp_obd->obd_exports_timed);
913         export->exp_obd->obd_num_exports++;
914         spin_unlock(&obd->obd_dev_lock);
915         cfs_hash_putref(hash);
916         RETURN(export);
917
918 exit_unlock:
919         spin_unlock(&obd->obd_dev_lock);
920 exit_err:
921         if (hash)
922                 cfs_hash_putref(hash);
923         class_handle_unhash(&export->exp_handle);
924         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
925         obd_destroy_export(export);
926         OBD_FREE_PTR(export);
927         return ERR_PTR(rc);
928 }
929 EXPORT_SYMBOL(class_new_export);
930
931 void class_unlink_export(struct obd_export *exp)
932 {
933         class_handle_unhash(&exp->exp_handle);
934
935         spin_lock(&exp->exp_obd->obd_dev_lock);
936         /* delete an uuid-export hashitem from hashtables */
937         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
938                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
939                              &exp->exp_client_uuid,
940                              &exp->exp_uuid_hash);
941
942         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
943         cfs_list_del_init(&exp->exp_obd_chain_timed);
944         exp->exp_obd->obd_num_exports--;
945         spin_unlock(&exp->exp_obd->obd_dev_lock);
946         class_export_put(exp);
947 }
948 EXPORT_SYMBOL(class_unlink_export);
949
950 /* Import management functions */
951 void class_import_destroy(struct obd_import *imp)
952 {
953         ENTRY;
954
955         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
956                 imp->imp_obd->obd_name);
957
958         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
959
960         ptlrpc_put_connection_superhack(imp->imp_connection);
961
962         while (!cfs_list_empty(&imp->imp_conn_list)) {
963                 struct obd_import_conn *imp_conn;
964
965                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
966                                           struct obd_import_conn, oic_item);
967                 cfs_list_del_init(&imp_conn->oic_item);
968                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
969                 OBD_FREE(imp_conn, sizeof(*imp_conn));
970         }
971
972         LASSERT(imp->imp_sec == NULL);
973         class_decref(imp->imp_obd, "import", imp);
974         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
975         EXIT;
976 }
977
978 static void import_handle_addref(void *import)
979 {
980         class_import_get(import);
981 }
982
983 static struct portals_handle_ops import_handle_ops = {
984         .hop_addref = import_handle_addref,
985         .hop_free   = NULL,
986 };
987
988 struct obd_import *class_import_get(struct obd_import *import)
989 {
990         cfs_atomic_inc(&import->imp_refcount);
991         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
992                cfs_atomic_read(&import->imp_refcount),
993                import->imp_obd->obd_name);
994         return import;
995 }
996 EXPORT_SYMBOL(class_import_get);
997
998 void class_import_put(struct obd_import *imp)
999 {
1000         ENTRY;
1001
1002         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1003         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1004
1005         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1006                cfs_atomic_read(&imp->imp_refcount) - 1,
1007                imp->imp_obd->obd_name);
1008
1009         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1010                 CDEBUG(D_INFO, "final put import %p\n", imp);
1011                 obd_zombie_import_add(imp);
1012         }
1013
1014         /* catch possible import put race */
1015         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1016         EXIT;
1017 }
1018 EXPORT_SYMBOL(class_import_put);
1019
1020 static void init_imp_at(struct imp_at *at) {
1021         int i;
1022         at_init(&at->iat_net_latency, 0, 0);
1023         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1024                 /* max service estimates are tracked on the server side, so
1025                    don't use the AT history here, just use the last reported
1026                    val. (But keep hist for proc histogram, worst_ever) */
1027                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1028                         AT_FLG_NOHIST);
1029         }
1030 }
1031
1032 struct obd_import *class_new_import(struct obd_device *obd)
1033 {
1034         struct obd_import *imp;
1035
1036         OBD_ALLOC(imp, sizeof(*imp));
1037         if (imp == NULL)
1038                 return NULL;
1039
1040         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1041         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1042         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1043         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1044         spin_lock_init(&imp->imp_lock);
1045         imp->imp_last_success_conn = 0;
1046         imp->imp_state = LUSTRE_IMP_NEW;
1047         imp->imp_obd = class_incref(obd, "import", imp);
1048         mutex_init(&imp->imp_sec_mutex);
1049         cfs_waitq_init(&imp->imp_recovery_waitq);
1050
1051         cfs_atomic_set(&imp->imp_refcount, 2);
1052         cfs_atomic_set(&imp->imp_unregistering, 0);
1053         cfs_atomic_set(&imp->imp_inflight, 0);
1054         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1055         cfs_atomic_set(&imp->imp_inval_count, 0);
1056         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1057         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1058         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1059         init_imp_at(&imp->imp_at);
1060
1061         /* the default magic is V2, will be used in connect RPC, and
1062          * then adjusted according to the flags in request/reply. */
1063         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1064
1065         return imp;
1066 }
1067 EXPORT_SYMBOL(class_new_import);
1068
1069 void class_destroy_import(struct obd_import *import)
1070 {
1071         LASSERT(import != NULL);
1072         LASSERT(import != LP_POISON);
1073
1074         class_handle_unhash(&import->imp_handle);
1075
1076         spin_lock(&import->imp_lock);
1077         import->imp_generation++;
1078         spin_unlock(&import->imp_lock);
1079         class_import_put(import);
1080 }
1081 EXPORT_SYMBOL(class_destroy_import);
1082
1083 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1084
1085 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086 {
1087         spin_lock(&exp->exp_locks_list_guard);
1088
1089         LASSERT(lock->l_exp_refs_nr >= 0);
1090
1091         if (lock->l_exp_refs_target != NULL &&
1092             lock->l_exp_refs_target != exp) {
1093                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1094                               exp, lock, lock->l_exp_refs_target);
1095         }
1096         if ((lock->l_exp_refs_nr ++) == 0) {
1097                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1098                 lock->l_exp_refs_target = exp;
1099         }
1100         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1101                lock, exp, lock->l_exp_refs_nr);
1102         spin_unlock(&exp->exp_locks_list_guard);
1103 }
1104 EXPORT_SYMBOL(__class_export_add_lock_ref);
1105
1106 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1107 {
1108         spin_lock(&exp->exp_locks_list_guard);
1109         LASSERT(lock->l_exp_refs_nr > 0);
1110         if (lock->l_exp_refs_target != exp) {
1111                 LCONSOLE_WARN("lock %p, "
1112                               "mismatching export pointers: %p, %p\n",
1113                               lock, lock->l_exp_refs_target, exp);
1114         }
1115         if (-- lock->l_exp_refs_nr == 0) {
1116                 cfs_list_del_init(&lock->l_exp_refs_link);
1117                 lock->l_exp_refs_target = NULL;
1118         }
1119         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1120                lock, exp, lock->l_exp_refs_nr);
1121         spin_unlock(&exp->exp_locks_list_guard);
1122 }
1123 EXPORT_SYMBOL(__class_export_del_lock_ref);
1124 #endif
1125
1126 /* A connection defines an export context in which preallocation can
1127    be managed. This releases the export pointer reference, and returns
1128    the export handle, so the export refcount is 1 when this function
1129    returns. */
1130 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1131                   struct obd_uuid *cluuid)
1132 {
1133         struct obd_export *export;
1134         LASSERT(conn != NULL);
1135         LASSERT(obd != NULL);
1136         LASSERT(cluuid != NULL);
1137         ENTRY;
1138
1139         export = class_new_export(obd, cluuid);
1140         if (IS_ERR(export))
1141                 RETURN(PTR_ERR(export));
1142
1143         conn->cookie = export->exp_handle.h_cookie;
1144         class_export_put(export);
1145
1146         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1147                cluuid->uuid, conn->cookie);
1148         RETURN(0);
1149 }
1150 EXPORT_SYMBOL(class_connect);
1151
1152 /* if export is involved in recovery then clean up related things */
1153 void class_export_recovery_cleanup(struct obd_export *exp)
1154 {
1155         struct obd_device *obd = exp->exp_obd;
1156
1157         spin_lock(&obd->obd_recovery_task_lock);
1158         if (exp->exp_delayed)
1159                 obd->obd_delayed_clients--;
1160         if (obd->obd_recovering) {
1161                 if (exp->exp_in_recovery) {
1162                         spin_lock(&exp->exp_lock);
1163                         exp->exp_in_recovery = 0;
1164                         spin_unlock(&exp->exp_lock);
1165                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1166                         cfs_atomic_dec(&obd->obd_connected_clients);
1167                 }
1168
1169                 /* if called during recovery then should update
1170                  * obd_stale_clients counter,
1171                  * lightweight exports are not counted */
1172                 if (exp->exp_failed &&
1173                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1174                         exp->exp_obd->obd_stale_clients++;
1175         }
1176         spin_unlock(&obd->obd_recovery_task_lock);
1177         /** Cleanup req replay fields */
1178         if (exp->exp_req_replay_needed) {
1179                 spin_lock(&exp->exp_lock);
1180                 exp->exp_req_replay_needed = 0;
1181                 spin_unlock(&exp->exp_lock);
1182                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1183                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1184         }
1185         /** Cleanup lock replay data */
1186         if (exp->exp_lock_replay_needed) {
1187                 spin_lock(&exp->exp_lock);
1188                 exp->exp_lock_replay_needed = 0;
1189                 spin_unlock(&exp->exp_lock);
1190                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1191                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1192         }
1193 }
1194
1195 /* This function removes 1-3 references from the export:
1196  * 1 - for export pointer passed
1197  * and if disconnect really need
1198  * 2 - removing from hash
1199  * 3 - in client_unlink_export
1200  * The export pointer passed to this function can destroyed */
1201 int class_disconnect(struct obd_export *export)
1202 {
1203         int already_disconnected;
1204         ENTRY;
1205
1206         if (export == NULL) {
1207                 CWARN("attempting to free NULL export %p\n", export);
1208                 RETURN(-EINVAL);
1209         }
1210
1211         spin_lock(&export->exp_lock);
1212         already_disconnected = export->exp_disconnected;
1213         export->exp_disconnected = 1;
1214         spin_unlock(&export->exp_lock);
1215
1216         /* class_cleanup(), abort_recovery(), and class_fail_export()
1217          * all end up in here, and if any of them race we shouldn't
1218          * call extra class_export_puts(). */
1219         if (already_disconnected) {
1220                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1221                 GOTO(no_disconn, already_disconnected);
1222         }
1223
1224         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1225                export->exp_handle.h_cookie);
1226
1227         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1228                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1229                              &export->exp_connection->c_peer.nid,
1230                              &export->exp_nid_hash);
1231
1232         class_export_recovery_cleanup(export);
1233         class_unlink_export(export);
1234 no_disconn:
1235         class_export_put(export);
1236         RETURN(0);
1237 }
1238 EXPORT_SYMBOL(class_disconnect);
1239
1240 /* Return non-zero for a fully connected export */
1241 int class_connected_export(struct obd_export *exp)
1242 {
1243         if (exp) {
1244                 int connected;
1245                 spin_lock(&exp->exp_lock);
1246                 connected = (exp->exp_conn_cnt > 0);
1247                 spin_unlock(&exp->exp_lock);
1248                 return connected;
1249         }
1250         return 0;
1251 }
1252 EXPORT_SYMBOL(class_connected_export);
1253
1254 static void class_disconnect_export_list(cfs_list_t *list,
1255                                          enum obd_option flags)
1256 {
1257         int rc;
1258         struct obd_export *exp;
1259         ENTRY;
1260
1261         /* It's possible that an export may disconnect itself, but
1262          * nothing else will be added to this list. */
1263         while (!cfs_list_empty(list)) {
1264                 exp = cfs_list_entry(list->next, struct obd_export,
1265                                      exp_obd_chain);
1266                 /* need for safe call CDEBUG after obd_disconnect */
1267                 class_export_get(exp);
1268
1269                 spin_lock(&exp->exp_lock);
1270                 exp->exp_flags = flags;
1271                 spin_unlock(&exp->exp_lock);
1272
1273                 if (obd_uuid_equals(&exp->exp_client_uuid,
1274                                     &exp->exp_obd->obd_uuid)) {
1275                         CDEBUG(D_HA,
1276                                "exp %p export uuid == obd uuid, don't discon\n",
1277                                exp);
1278                         /* Need to delete this now so we don't end up pointing
1279                          * to work_list later when this export is cleaned up. */
1280                         cfs_list_del_init(&exp->exp_obd_chain);
1281                         class_export_put(exp);
1282                         continue;
1283                 }
1284
1285                 class_export_get(exp);
1286                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1287                        "last request at "CFS_TIME_T"\n",
1288                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1289                        exp, exp->exp_last_request_time);
1290                 /* release one export reference anyway */
1291                 rc = obd_disconnect(exp);
1292
1293                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1294                        obd_export_nid2str(exp), exp, rc);
1295                 class_export_put(exp);
1296         }
1297         EXIT;
1298 }
1299
1300 void class_disconnect_exports(struct obd_device *obd)
1301 {
1302         cfs_list_t work_list;
1303         ENTRY;
1304
1305         /* Move all of the exports from obd_exports to a work list, en masse. */
1306         CFS_INIT_LIST_HEAD(&work_list);
1307         spin_lock(&obd->obd_dev_lock);
1308         cfs_list_splice_init(&obd->obd_exports, &work_list);
1309         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1310         spin_unlock(&obd->obd_dev_lock);
1311
1312         if (!cfs_list_empty(&work_list)) {
1313                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1314                        "disconnecting them\n", obd->obd_minor, obd);
1315                 class_disconnect_export_list(&work_list,
1316                                              exp_flags_from_obd(obd));
1317         } else
1318                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1319                        obd->obd_minor, obd);
1320         EXIT;
1321 }
1322 EXPORT_SYMBOL(class_disconnect_exports);
1323
1324 /* Remove exports that have not completed recovery.
1325  */
1326 void class_disconnect_stale_exports(struct obd_device *obd,
1327                                     int (*test_export)(struct obd_export *))
1328 {
1329         cfs_list_t work_list;
1330         struct obd_export *exp, *n;
1331         int evicted = 0;
1332         ENTRY;
1333
1334         CFS_INIT_LIST_HEAD(&work_list);
1335         spin_lock(&obd->obd_dev_lock);
1336         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1337                                      exp_obd_chain) {
1338                 /* don't count self-export as client */
1339                 if (obd_uuid_equals(&exp->exp_client_uuid,
1340                                     &exp->exp_obd->obd_uuid))
1341                         continue;
1342
1343                 /* don't evict clients which have no slot in last_rcvd
1344                  * (e.g. lightweight connection) */
1345                 if (exp->exp_target_data.ted_lr_idx == -1)
1346                         continue;
1347
1348                 spin_lock(&exp->exp_lock);
1349                 if (exp->exp_failed || test_export(exp)) {
1350                         spin_unlock(&exp->exp_lock);
1351                         continue;
1352                 }
1353                 exp->exp_failed = 1;
1354                 spin_unlock(&exp->exp_lock);
1355
1356                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1357                 evicted++;
1358                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1359                        obd->obd_name, exp->exp_client_uuid.uuid,
1360                        exp->exp_connection == NULL ? "<unknown>" :
1361                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1362                 print_export_data(exp, "EVICTING", 0);
1363         }
1364         spin_unlock(&obd->obd_dev_lock);
1365
1366         if (evicted)
1367                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1368                               obd->obd_name, evicted);
1369
1370         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1371                                                  OBD_OPT_ABORT_RECOV);
1372         EXIT;
1373 }
1374 EXPORT_SYMBOL(class_disconnect_stale_exports);
1375
1376 void class_fail_export(struct obd_export *exp)
1377 {
1378         int rc, already_failed;
1379
1380         spin_lock(&exp->exp_lock);
1381         already_failed = exp->exp_failed;
1382         exp->exp_failed = 1;
1383         spin_unlock(&exp->exp_lock);
1384
1385         if (already_failed) {
1386                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1387                        exp, exp->exp_client_uuid.uuid);
1388                 return;
1389         }
1390
1391         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1392                exp, exp->exp_client_uuid.uuid);
1393
1394         if (obd_dump_on_timeout)
1395                 libcfs_debug_dumplog();
1396
1397         /* need for safe call CDEBUG after obd_disconnect */
1398         class_export_get(exp);
1399
1400         /* Most callers into obd_disconnect are removing their own reference
1401          * (request, for example) in addition to the one from the hash table.
1402          * We don't have such a reference here, so make one. */
1403         class_export_get(exp);
1404         rc = obd_disconnect(exp);
1405         if (rc)
1406                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1407         else
1408                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1409                        exp, exp->exp_client_uuid.uuid);
1410         class_export_put(exp);
1411 }
1412 EXPORT_SYMBOL(class_fail_export);
1413
1414 char *obd_export_nid2str(struct obd_export *exp)
1415 {
1416         if (exp->exp_connection != NULL)
1417                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1418
1419         return "(no nid)";
1420 }
1421 EXPORT_SYMBOL(obd_export_nid2str);
1422
1423 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1424 {
1425         cfs_hash_t *nid_hash;
1426         struct obd_export *doomed_exp = NULL;
1427         int exports_evicted = 0;
1428
1429         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1430
1431         spin_lock(&obd->obd_dev_lock);
1432         /* umount has run already, so evict thread should leave
1433          * its task to umount thread now */
1434         if (obd->obd_stopping) {
1435                 spin_unlock(&obd->obd_dev_lock);
1436                 return exports_evicted;
1437         }
1438         nid_hash = obd->obd_nid_hash;
1439         cfs_hash_getref(nid_hash);
1440         spin_unlock(&obd->obd_dev_lock);
1441
1442         do {
1443                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1444                 if (doomed_exp == NULL)
1445                         break;
1446
1447                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1448                          "nid %s found, wanted nid %s, requested nid %s\n",
1449                          obd_export_nid2str(doomed_exp),
1450                          libcfs_nid2str(nid_key), nid);
1451                 LASSERTF(doomed_exp != obd->obd_self_export,
1452                          "self-export is hashed by NID?\n");
1453                 exports_evicted++;
1454                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1455                               "request\n", obd->obd_name,
1456                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1457                               obd_export_nid2str(doomed_exp));
1458                 class_fail_export(doomed_exp);
1459                 class_export_put(doomed_exp);
1460         } while (1);
1461
1462         cfs_hash_putref(nid_hash);
1463
1464         if (!exports_evicted)
1465                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1466                        obd->obd_name, nid);
1467         return exports_evicted;
1468 }
1469 EXPORT_SYMBOL(obd_export_evict_by_nid);
1470
1471 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1472 {
1473         cfs_hash_t *uuid_hash;
1474         struct obd_export *doomed_exp = NULL;
1475         struct obd_uuid doomed_uuid;
1476         int exports_evicted = 0;
1477
1478         spin_lock(&obd->obd_dev_lock);
1479         if (obd->obd_stopping) {
1480                 spin_unlock(&obd->obd_dev_lock);
1481                 return exports_evicted;
1482         }
1483         uuid_hash = obd->obd_uuid_hash;
1484         cfs_hash_getref(uuid_hash);
1485         spin_unlock(&obd->obd_dev_lock);
1486
1487         obd_str2uuid(&doomed_uuid, uuid);
1488         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1489                 CERROR("%s: can't evict myself\n", obd->obd_name);
1490                 cfs_hash_putref(uuid_hash);
1491                 return exports_evicted;
1492         }
1493
1494         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1495
1496         if (doomed_exp == NULL) {
1497                 CERROR("%s: can't disconnect %s: no exports found\n",
1498                        obd->obd_name, uuid);
1499         } else {
1500                 CWARN("%s: evicting %s at adminstrative request\n",
1501                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1502                 class_fail_export(doomed_exp);
1503                 class_export_put(doomed_exp);
1504                 exports_evicted++;
1505         }
1506         cfs_hash_putref(uuid_hash);
1507
1508         return exports_evicted;
1509 }
1510 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1511
1512 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1513 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1514 EXPORT_SYMBOL(class_export_dump_hook);
1515 #endif
1516
1517 static void print_export_data(struct obd_export *exp, const char *status,
1518                               int locks)
1519 {
1520         struct ptlrpc_reply_state *rs;
1521         struct ptlrpc_reply_state *first_reply = NULL;
1522         int nreplies = 0;
1523
1524         spin_lock(&exp->exp_lock);
1525         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1526                                 rs_exp_list) {
1527                 if (nreplies == 0)
1528                         first_reply = rs;
1529                 nreplies++;
1530         }
1531         spin_unlock(&exp->exp_lock);
1532
1533         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1534                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1535                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1536                cfs_atomic_read(&exp->exp_rpc_count),
1537                cfs_atomic_read(&exp->exp_cb_count),
1538                cfs_atomic_read(&exp->exp_locks_count),
1539                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1540                nreplies, first_reply, nreplies > 3 ? "..." : "",
1541                exp->exp_last_committed);
1542 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1543         if (locks && class_export_dump_hook != NULL)
1544                 class_export_dump_hook(exp);
1545 #endif
1546 }
1547
1548 void dump_exports(struct obd_device *obd, int locks)
1549 {
1550         struct obd_export *exp;
1551
1552         spin_lock(&obd->obd_dev_lock);
1553         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1554                 print_export_data(exp, "ACTIVE", locks);
1555         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1556                 print_export_data(exp, "UNLINKED", locks);
1557         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1558                 print_export_data(exp, "DELAYED", locks);
1559         spin_unlock(&obd->obd_dev_lock);
1560         spin_lock(&obd_zombie_impexp_lock);
1561         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1562                 print_export_data(exp, "ZOMBIE", locks);
1563         spin_unlock(&obd_zombie_impexp_lock);
1564 }
1565 EXPORT_SYMBOL(dump_exports);
1566
1567 void obd_exports_barrier(struct obd_device *obd)
1568 {
1569         int waited = 2;
1570         LASSERT(cfs_list_empty(&obd->obd_exports));
1571         spin_lock(&obd->obd_dev_lock);
1572         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1573                 spin_unlock(&obd->obd_dev_lock);
1574                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1575                                                    cfs_time_seconds(waited));
1576                 if (waited > 5 && IS_PO2(waited)) {
1577                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1578                                       "more than %d seconds. "
1579                                       "The obd refcount = %d. Is it stuck?\n",
1580                                       obd->obd_name, waited,
1581                                       cfs_atomic_read(&obd->obd_refcount));
1582                         dump_exports(obd, 1);
1583                 }
1584                 waited *= 2;
1585                 spin_lock(&obd->obd_dev_lock);
1586         }
1587         spin_unlock(&obd->obd_dev_lock);
1588 }
1589 EXPORT_SYMBOL(obd_exports_barrier);
1590
1591 /* Total amount of zombies to be destroyed */
1592 static int zombies_count = 0;
1593
1594 /**
1595  * kill zombie imports and exports
1596  */
1597 void obd_zombie_impexp_cull(void)
1598 {
1599         struct obd_import *import;
1600         struct obd_export *export;
1601         ENTRY;
1602
1603         do {
1604                 spin_lock(&obd_zombie_impexp_lock);
1605
1606                 import = NULL;
1607                 if (!cfs_list_empty(&obd_zombie_imports)) {
1608                         import = cfs_list_entry(obd_zombie_imports.next,
1609                                                 struct obd_import,
1610                                                 imp_zombie_chain);
1611                         cfs_list_del_init(&import->imp_zombie_chain);
1612                 }
1613
1614                 export = NULL;
1615                 if (!cfs_list_empty(&obd_zombie_exports)) {
1616                         export = cfs_list_entry(obd_zombie_exports.next,
1617                                                 struct obd_export,
1618                                                 exp_obd_chain);
1619                         cfs_list_del_init(&export->exp_obd_chain);
1620                 }
1621
1622                 spin_unlock(&obd_zombie_impexp_lock);
1623
1624                 if (import != NULL) {
1625                         class_import_destroy(import);
1626                         spin_lock(&obd_zombie_impexp_lock);
1627                         zombies_count--;
1628                         spin_unlock(&obd_zombie_impexp_lock);
1629                 }
1630
1631                 if (export != NULL) {
1632                         class_export_destroy(export);
1633                         spin_lock(&obd_zombie_impexp_lock);
1634                         zombies_count--;
1635                         spin_unlock(&obd_zombie_impexp_lock);
1636                 }
1637
1638                 cfs_cond_resched();
1639         } while (import != NULL || export != NULL);
1640         EXIT;
1641 }
1642
1643 static struct completion        obd_zombie_start;
1644 static struct completion        obd_zombie_stop;
1645 static unsigned long            obd_zombie_flags;
1646 static cfs_waitq_t              obd_zombie_waitq;
1647 static pid_t                    obd_zombie_pid;
1648
1649 enum {
1650         OBD_ZOMBIE_STOP         = 0x0001,
1651 };
1652
1653 /**
1654  * check for work for kill zombie import/export thread.
1655  */
1656 static int obd_zombie_impexp_check(void *arg)
1657 {
1658         int rc;
1659
1660         spin_lock(&obd_zombie_impexp_lock);
1661         rc = (zombies_count == 0) &&
1662              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1663         spin_unlock(&obd_zombie_impexp_lock);
1664
1665         RETURN(rc);
1666 }
1667
1668 /**
1669  * Add export to the obd_zombe thread and notify it.
1670  */
1671 static void obd_zombie_export_add(struct obd_export *exp) {
1672         spin_lock(&exp->exp_obd->obd_dev_lock);
1673         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1674         cfs_list_del_init(&exp->exp_obd_chain);
1675         spin_unlock(&exp->exp_obd->obd_dev_lock);
1676         spin_lock(&obd_zombie_impexp_lock);
1677         zombies_count++;
1678         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1679         spin_unlock(&obd_zombie_impexp_lock);
1680
1681         obd_zombie_impexp_notify();
1682 }
1683
1684 /**
1685  * Add import to the obd_zombe thread and notify it.
1686  */
1687 static void obd_zombie_import_add(struct obd_import *imp) {
1688         LASSERT(imp->imp_sec == NULL);
1689         LASSERT(imp->imp_rq_pool == NULL);
1690         spin_lock(&obd_zombie_impexp_lock);
1691         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1692         zombies_count++;
1693         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1694         spin_unlock(&obd_zombie_impexp_lock);
1695
1696         obd_zombie_impexp_notify();
1697 }
1698
1699 /**
1700  * notify import/export destroy thread about new zombie.
1701  */
1702 static void obd_zombie_impexp_notify(void)
1703 {
1704         /*
1705          * Make sure obd_zomebie_impexp_thread get this notification.
1706          * It is possible this signal only get by obd_zombie_barrier, and
1707          * barrier gulps this notification and sleeps away and hangs ensues
1708          */
1709         cfs_waitq_broadcast(&obd_zombie_waitq);
1710 }
1711
1712 /**
1713  * check whether obd_zombie is idle
1714  */
1715 static int obd_zombie_is_idle(void)
1716 {
1717         int rc;
1718
1719         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1720         spin_lock(&obd_zombie_impexp_lock);
1721         rc = (zombies_count == 0);
1722         spin_unlock(&obd_zombie_impexp_lock);
1723         return rc;
1724 }
1725
1726 /**
1727  * wait when obd_zombie import/export queues become empty
1728  */
1729 void obd_zombie_barrier(void)
1730 {
1731         struct l_wait_info lwi = { 0 };
1732
1733         if (obd_zombie_pid == cfs_curproc_pid())
1734                 /* don't wait for myself */
1735                 return;
1736         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1737 }
1738 EXPORT_SYMBOL(obd_zombie_barrier);
1739
1740 #ifdef __KERNEL__
1741
1742 /**
1743  * destroy zombie export/import thread.
1744  */
1745 static int obd_zombie_impexp_thread(void *unused)
1746 {
1747         int rc;
1748
1749         rc = cfs_daemonize_ctxt("obd_zombid");
1750         if (rc != 0) {
1751                 complete(&obd_zombie_start);
1752                 RETURN(rc);
1753         }
1754
1755         complete(&obd_zombie_start);
1756
1757         obd_zombie_pid = cfs_curproc_pid();
1758
1759         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1760                 struct l_wait_info lwi = { 0 };
1761
1762                 l_wait_event(obd_zombie_waitq,
1763                              !obd_zombie_impexp_check(NULL), &lwi);
1764                 obd_zombie_impexp_cull();
1765
1766                 /*
1767                  * Notify obd_zombie_barrier callers that queues
1768                  * may be empty.
1769                  */
1770                 cfs_waitq_signal(&obd_zombie_waitq);
1771         }
1772
1773         complete(&obd_zombie_stop);
1774
1775         RETURN(0);
1776 }
1777
1778 #else /* ! KERNEL */
1779
1780 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1781 static void *obd_zombie_impexp_work_cb;
1782 static void *obd_zombie_impexp_idle_cb;
1783
1784 int obd_zombie_impexp_kill(void *arg)
1785 {
1786         int rc = 0;
1787
1788         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1789                 obd_zombie_impexp_cull();
1790                 rc = 1;
1791         }
1792         cfs_atomic_dec(&zombie_recur);
1793         return rc;
1794 }
1795
1796 #endif
1797
1798 /**
1799  * start destroy zombie import/export thread
1800  */
1801 int obd_zombie_impexp_init(void)
1802 {
1803         int rc;
1804
1805         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1806         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1807         spin_lock_init(&obd_zombie_impexp_lock);
1808         init_completion(&obd_zombie_start);
1809         init_completion(&obd_zombie_stop);
1810         cfs_waitq_init(&obd_zombie_waitq);
1811         obd_zombie_pid = 0;
1812
1813 #ifdef __KERNEL__
1814         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1815         if (rc < 0)
1816                 RETURN(rc);
1817
1818         wait_for_completion(&obd_zombie_start);
1819 #else
1820
1821         obd_zombie_impexp_work_cb =
1822                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1823                                                  &obd_zombie_impexp_kill, NULL);
1824
1825         obd_zombie_impexp_idle_cb =
1826                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1827                                                  &obd_zombie_impexp_check, NULL);
1828         rc = 0;
1829 #endif
1830         RETURN(rc);
1831 }
1832 /**
1833  * stop destroy zombie import/export thread
1834  */
1835 void obd_zombie_impexp_stop(void)
1836 {
1837         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1838         obd_zombie_impexp_notify();
1839 #ifdef __KERNEL__
1840         wait_for_completion(&obd_zombie_stop);
1841 #else
1842         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1843         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1844 #endif
1845 }
1846
1847 /***** Kernel-userspace comm helpers *******/
1848
1849 /* Get length of entire message, including header */
1850 int kuc_len(int payload_len)
1851 {
1852         return sizeof(struct kuc_hdr) + payload_len;
1853 }
1854 EXPORT_SYMBOL(kuc_len);
1855
1856 /* Get a pointer to kuc header, given a ptr to the payload
1857  * @param p Pointer to payload area
1858  * @returns Pointer to kuc header
1859  */
1860 struct kuc_hdr * kuc_ptr(void *p)
1861 {
1862         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1863         LASSERT(lh->kuc_magic == KUC_MAGIC);
1864         return lh;
1865 }
1866 EXPORT_SYMBOL(kuc_ptr);
1867
1868 /* Test if payload is part of kuc message
1869  * @param p Pointer to payload area
1870  * @returns boolean
1871  */
1872 int kuc_ispayload(void *p)
1873 {
1874         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1875
1876         if (kh->kuc_magic == KUC_MAGIC)
1877                 return 1;
1878         else
1879                 return 0;
1880 }
1881 EXPORT_SYMBOL(kuc_ispayload);
1882
1883 /* Alloc space for a message, and fill in header
1884  * @return Pointer to payload area
1885  */
1886 void *kuc_alloc(int payload_len, int transport, int type)
1887 {
1888         struct kuc_hdr *lh;
1889         int len = kuc_len(payload_len);
1890
1891         OBD_ALLOC(lh, len);
1892         if (lh == NULL)
1893                 return ERR_PTR(-ENOMEM);
1894
1895         lh->kuc_magic = KUC_MAGIC;
1896         lh->kuc_transport = transport;
1897         lh->kuc_msgtype = type;
1898         lh->kuc_msglen = len;
1899
1900         return (void *)(lh + 1);
1901 }
1902 EXPORT_SYMBOL(kuc_alloc);
1903
1904 /* Takes pointer to payload area */
1905 inline void kuc_free(void *p, int payload_len)
1906 {
1907         struct kuc_hdr *lh = kuc_ptr(p);
1908         OBD_FREE(lh, kuc_len(payload_len));
1909 }
1910 EXPORT_SYMBOL(kuc_free);
1911
1912
1913