Whamcloud - gitweb
LU-2675 obd: remove dead code
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
48
49 extern cfs_list_t obd_types;
50 spinlock_t obd_types_lock;
51
52 struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 struct kmem_cache *import_cachep;
56
57 cfs_list_t      obd_zombie_imports;
58 cfs_list_t      obd_zombie_exports;
59 spinlock_t  obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         cfs_list_t *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         cfs_list_for_each(tmp, &obd_types) {
105                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114 EXPORT_SYMBOL(class_search_type);
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123
124                 if (strcmp(modname, "obdfilter") == 0)
125                         modname = "ofd";
126
127                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128                         modname = LUSTRE_OSP_NAME;
129
130                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131                         modname = LUSTRE_MDT_NAME;
132
133                 if (!request_module("%s", modname)) {
134                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135                         type = class_search_type(name);
136                 } else {
137                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138                                            modname);
139                 }
140         }
141 #endif
142         if (type) {
143                 spin_lock(&type->obd_type_lock);
144                 type->typ_refcnt++;
145                 try_module_get(type->typ_dt_ops->o_owner);
146                 spin_unlock(&type->obd_type_lock);
147         }
148         return type;
149 }
150 EXPORT_SYMBOL(class_get_type);
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160 EXPORT_SYMBOL(class_put_type);
161
162 #define CLASS_MAX_NAME 1024
163
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165                         bool enable_proc, struct lprocfs_seq_vars *module_vars,
166 #ifndef HAVE_ONLY_PROCFS_SEQ
167                         struct lprocfs_vars *vars,
168 #endif
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef LPROCFS
205         if (enable_proc) {
206 #ifndef HAVE_ONLY_PROCFS_SEQ
207                 if (vars) {
208                         type->typ_procroot = lprocfs_register(type->typ_name,
209                                                               proc_lustre_root,
210                                                               vars, type);
211                 } else
212 #endif
213                 {
214                         type->typ_procroot = lprocfs_seq_register(type->typ_name,
215                                                                   proc_lustre_root,
216                                                                   module_vars, type);
217                 }
218                 if (IS_ERR(type->typ_procroot)) {
219                         rc = PTR_ERR(type->typ_procroot);
220                         type->typ_procroot = NULL;
221                         GOTO(failed, rc);
222                 }
223         }
224 #endif
225         if (ldt != NULL) {
226                 type->typ_lu = ldt;
227                 rc = lu_device_type_init(ldt);
228                 if (rc != 0)
229                         GOTO (failed, rc);
230         }
231
232         spin_lock(&obd_types_lock);
233         cfs_list_add(&type->typ_chain, &obd_types);
234         spin_unlock(&obd_types_lock);
235
236         RETURN (0);
237
238 failed:
239         if (type->typ_name != NULL) {
240 #ifdef LPROCFS
241                 if (type->typ_procroot != NULL) {
242 #ifndef HAVE_ONLY_PROCFS_SEQ
243                         lprocfs_try_remove_proc_entry(type->typ_name,
244                                                       proc_lustre_root);
245 #else
246                         remove_proc_subtree(type->typ_name, proc_lustre_root);
247 #endif
248                 }
249 #endif
250                 OBD_FREE(type->typ_name, strlen(name) + 1);
251         }
252         if (type->typ_md_ops != NULL)
253                 OBD_FREE_PTR(type->typ_md_ops);
254         if (type->typ_dt_ops != NULL)
255                 OBD_FREE_PTR(type->typ_dt_ops);
256         OBD_FREE(type, sizeof(*type));
257         RETURN(rc);
258 }
259 EXPORT_SYMBOL(class_register_type);
260
261 int class_unregister_type(const char *name)
262 {
263         struct obd_type *type = class_search_type(name);
264         ENTRY;
265
266         if (!type) {
267                 CERROR("unknown obd type\n");
268                 RETURN(-EINVAL);
269         }
270
271         if (type->typ_refcnt) {
272                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
273                 /* This is a bad situation, let's make the best of it */
274                 /* Remove ops, but leave the name for debugging */
275                 OBD_FREE_PTR(type->typ_dt_ops);
276                 OBD_FREE_PTR(type->typ_md_ops);
277                 RETURN(-EBUSY);
278         }
279
280         /* we do not use type->typ_procroot as for compatibility purposes
281          * other modules can share names (i.e. lod can use lov entry). so
282          * we can't reference pointer as it can get invalided when another
283          * module removes the entry */
284 #ifdef LPROCFS
285         if (type->typ_procroot != NULL) {
286 #ifndef HAVE_ONLY_PROCFS_SEQ
287                 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
288 #else
289                 remove_proc_subtree(type->typ_name, proc_lustre_root);
290 #endif
291         }
292
293         if (type->typ_procsym != NULL)
294                 lprocfs_remove(&type->typ_procsym);
295 #endif
296         if (type->typ_lu)
297                 lu_device_type_fini(type->typ_lu);
298
299         spin_lock(&obd_types_lock);
300         cfs_list_del(&type->typ_chain);
301         spin_unlock(&obd_types_lock);
302         OBD_FREE(type->typ_name, strlen(name) + 1);
303         if (type->typ_dt_ops != NULL)
304                 OBD_FREE_PTR(type->typ_dt_ops);
305         if (type->typ_md_ops != NULL)
306                 OBD_FREE_PTR(type->typ_md_ops);
307         OBD_FREE(type, sizeof(*type));
308         RETURN(0);
309 } /* class_unregister_type */
310 EXPORT_SYMBOL(class_unregister_type);
311
312 /**
313  * Create a new obd device.
314  *
315  * Find an empty slot in ::obd_devs[], create a new obd device in it.
316  *
317  * \param[in] type_name obd device type string.
318  * \param[in] name      obd device name.
319  *
320  * \retval NULL if create fails, otherwise return the obd device
321  *         pointer created.
322  */
323 struct obd_device *class_newdev(const char *type_name, const char *name)
324 {
325         struct obd_device *result = NULL;
326         struct obd_device *newdev;
327         struct obd_type *type = NULL;
328         int i;
329         int new_obd_minor = 0;
330         ENTRY;
331
332         if (strlen(name) >= MAX_OBD_NAME) {
333                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
334                 RETURN(ERR_PTR(-EINVAL));
335         }
336
337         type = class_get_type(type_name);
338         if (type == NULL){
339                 CERROR("OBD: unknown type: %s\n", type_name);
340                 RETURN(ERR_PTR(-ENODEV));
341         }
342
343         newdev = obd_device_alloc();
344         if (newdev == NULL)
345                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
346
347         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
348
349         write_lock(&obd_dev_lock);
350         for (i = 0; i < class_devno_max(); i++) {
351                 struct obd_device *obd = class_num2obd(i);
352
353                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
354                         CERROR("Device %s already exists at %d, won't add\n",
355                                name, i);
356                         if (result) {
357                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
358                                          "%p obd_magic %08x != %08x\n", result,
359                                          result->obd_magic, OBD_DEVICE_MAGIC);
360                                 LASSERTF(result->obd_minor == new_obd_minor,
361                                          "%p obd_minor %d != %d\n", result,
362                                          result->obd_minor, new_obd_minor);
363
364                                 obd_devs[result->obd_minor] = NULL;
365                                 result->obd_name[0]='\0';
366                          }
367                         result = ERR_PTR(-EEXIST);
368                         break;
369                 }
370                 if (!result && !obd) {
371                         result = newdev;
372                         result->obd_minor = i;
373                         new_obd_minor = i;
374                         result->obd_type = type;
375                         strncpy(result->obd_name, name,
376                                 sizeof(result->obd_name) - 1);
377                         obd_devs[i] = result;
378                 }
379         }
380         write_unlock(&obd_dev_lock);
381
382         if (result == NULL && i >= class_devno_max()) {
383                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
384                        class_devno_max());
385                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
386         }
387
388         if (IS_ERR(result))
389                 GOTO(out, result);
390
391         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
392                result->obd_name, result);
393
394         RETURN(result);
395 out:
396         obd_device_free(newdev);
397 out_type:
398         class_put_type(type);
399         return result;
400 }
401
402 void class_release_dev(struct obd_device *obd)
403 {
404         struct obd_type *obd_type = obd->obd_type;
405
406         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
407                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
408         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
409                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
410         LASSERT(obd_type != NULL);
411
412         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
413                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
414
415         write_lock(&obd_dev_lock);
416         obd_devs[obd->obd_minor] = NULL;
417         write_unlock(&obd_dev_lock);
418         obd_device_free(obd);
419
420         class_put_type(obd_type);
421 }
422
423 int class_name2dev(const char *name)
424 {
425         int i;
426
427         if (!name)
428                 return -1;
429
430         read_lock(&obd_dev_lock);
431         for (i = 0; i < class_devno_max(); i++) {
432                 struct obd_device *obd = class_num2obd(i);
433
434                 if (obd && strcmp(name, obd->obd_name) == 0) {
435                         /* Make sure we finished attaching before we give
436                            out any references */
437                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
438                         if (obd->obd_attached) {
439                                 read_unlock(&obd_dev_lock);
440                                 return i;
441                         }
442                         break;
443                 }
444         }
445         read_unlock(&obd_dev_lock);
446
447         return -1;
448 }
449 EXPORT_SYMBOL(class_name2dev);
450
451 struct obd_device *class_name2obd(const char *name)
452 {
453         int dev = class_name2dev(name);
454
455         if (dev < 0 || dev > class_devno_max())
456                 return NULL;
457         return class_num2obd(dev);
458 }
459 EXPORT_SYMBOL(class_name2obd);
460
461 int class_uuid2dev(struct obd_uuid *uuid)
462 {
463         int i;
464
465         read_lock(&obd_dev_lock);
466         for (i = 0; i < class_devno_max(); i++) {
467                 struct obd_device *obd = class_num2obd(i);
468
469                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
470                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
471                         read_unlock(&obd_dev_lock);
472                         return i;
473                 }
474         }
475         read_unlock(&obd_dev_lock);
476
477         return -1;
478 }
479 EXPORT_SYMBOL(class_uuid2dev);
480
481 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
482 {
483         int dev = class_uuid2dev(uuid);
484         if (dev < 0)
485                 return NULL;
486         return class_num2obd(dev);
487 }
488 EXPORT_SYMBOL(class_uuid2obd);
489
490 /**
491  * Get obd device from ::obd_devs[]
492  *
493  * \param num [in] array index
494  *
495  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
496  *         otherwise return the obd device there.
497  */
498 struct obd_device *class_num2obd(int num)
499 {
500         struct obd_device *obd = NULL;
501
502         if (num < class_devno_max()) {
503                 obd = obd_devs[num];
504                 if (obd == NULL)
505                         return NULL;
506
507                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
508                          "%p obd_magic %08x != %08x\n",
509                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
510                 LASSERTF(obd->obd_minor == num,
511                          "%p obd_minor %0d != %0d\n",
512                          obd, obd->obd_minor, num);
513         }
514
515         return obd;
516 }
517 EXPORT_SYMBOL(class_num2obd);
518
519 /**
520  * Get obd devices count. Device in any
521  *    state are counted
522  * \retval obd device count
523  */
524 int get_devices_count(void)
525 {
526         int index, max_index = class_devno_max(), dev_count = 0;
527
528         read_lock(&obd_dev_lock);
529         for (index = 0; index <= max_index; index++) {
530                 struct obd_device *obd = class_num2obd(index);
531                 if (obd != NULL)
532                         dev_count++;
533         }
534         read_unlock(&obd_dev_lock);
535
536         return dev_count;
537 }
538 EXPORT_SYMBOL(get_devices_count);
539
540 void class_obd_list(void)
541 {
542         char *status;
543         int i;
544
545         read_lock(&obd_dev_lock);
546         for (i = 0; i < class_devno_max(); i++) {
547                 struct obd_device *obd = class_num2obd(i);
548
549                 if (obd == NULL)
550                         continue;
551                 if (obd->obd_stopping)
552                         status = "ST";
553                 else if (obd->obd_set_up)
554                         status = "UP";
555                 else if (obd->obd_attached)
556                         status = "AT";
557                 else
558                         status = "--";
559                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
560                          i, status, obd->obd_type->typ_name,
561                          obd->obd_name, obd->obd_uuid.uuid,
562                          atomic_read(&obd->obd_refcount));
563         }
564         read_unlock(&obd_dev_lock);
565         return;
566 }
567
568 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
569    specified, then only the client with that uuid is returned,
570    otherwise any client connected to the tgt is returned. */
571 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
572                                           const char * typ_name,
573                                           struct obd_uuid *grp_uuid)
574 {
575         int i;
576
577         read_lock(&obd_dev_lock);
578         for (i = 0; i < class_devno_max(); i++) {
579                 struct obd_device *obd = class_num2obd(i);
580
581                 if (obd == NULL)
582                         continue;
583                 if ((strncmp(obd->obd_type->typ_name, typ_name,
584                              strlen(typ_name)) == 0)) {
585                         if (obd_uuid_equals(tgt_uuid,
586                                             &obd->u.cli.cl_target_uuid) &&
587                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
588                                                          &obd->obd_uuid) : 1)) {
589                                 read_unlock(&obd_dev_lock);
590                                 return obd;
591                         }
592                 }
593         }
594         read_unlock(&obd_dev_lock);
595
596         return NULL;
597 }
598 EXPORT_SYMBOL(class_find_client_obd);
599
600 /* Iterate the obd_device list looking devices have grp_uuid. Start
601    searching at *next, and if a device is found, the next index to look
602    at is saved in *next. If next is NULL, then the first matching device
603    will always be returned. */
604 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
605 {
606         int i;
607
608         if (next == NULL)
609                 i = 0;
610         else if (*next >= 0 && *next < class_devno_max())
611                 i = *next;
612         else
613                 return NULL;
614
615         read_lock(&obd_dev_lock);
616         for (; i < class_devno_max(); i++) {
617                 struct obd_device *obd = class_num2obd(i);
618
619                 if (obd == NULL)
620                         continue;
621                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
622                         if (next != NULL)
623                                 *next = i+1;
624                         read_unlock(&obd_dev_lock);
625                         return obd;
626                 }
627         }
628         read_unlock(&obd_dev_lock);
629
630         return NULL;
631 }
632 EXPORT_SYMBOL(class_devices_in_group);
633
634 /**
635  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
636  * adjust sptlrpc settings accordingly.
637  */
638 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
639 {
640         struct obd_device  *obd;
641         const char         *type;
642         int                 i, rc = 0, rc2;
643
644         LASSERT(namelen > 0);
645
646         read_lock(&obd_dev_lock);
647         for (i = 0; i < class_devno_max(); i++) {
648                 obd = class_num2obd(i);
649
650                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
651                         continue;
652
653                 /* only notify mdc, osc, mdt, ost */
654                 type = obd->obd_type->typ_name;
655                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
656                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
657                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
658                     strcmp(type, LUSTRE_OST_NAME) != 0)
659                         continue;
660
661                 if (strncmp(obd->obd_name, fsname, namelen))
662                         continue;
663
664                 class_incref(obd, __FUNCTION__, obd);
665                 read_unlock(&obd_dev_lock);
666                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
667                                          sizeof(KEY_SPTLRPC_CONF),
668                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
669                 rc = rc ? rc : rc2;
670                 class_decref(obd, __FUNCTION__, obd);
671                 read_lock(&obd_dev_lock);
672         }
673         read_unlock(&obd_dev_lock);
674         return rc;
675 }
676 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
677
678 void obd_cleanup_caches(void)
679 {
680         ENTRY;
681         if (obd_device_cachep) {
682                 kmem_cache_destroy(obd_device_cachep);
683                 obd_device_cachep = NULL;
684         }
685         if (obdo_cachep) {
686                 kmem_cache_destroy(obdo_cachep);
687                 obdo_cachep = NULL;
688         }
689         if (import_cachep) {
690                 kmem_cache_destroy(import_cachep);
691                 import_cachep = NULL;
692         }
693         if (capa_cachep) {
694                 kmem_cache_destroy(capa_cachep);
695                 capa_cachep = NULL;
696         }
697         EXIT;
698 }
699
700 int obd_init_caches(void)
701 {
702         int rc;
703         ENTRY;
704
705         LASSERT(obd_device_cachep == NULL);
706         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
707                                               sizeof(struct obd_device),
708                                               0, 0, NULL);
709         if (!obd_device_cachep)
710                 GOTO(out, rc = -ENOMEM);
711
712         LASSERT(obdo_cachep == NULL);
713         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
714                                         0, 0, NULL);
715         if (!obdo_cachep)
716                 GOTO(out, rc = -ENOMEM);
717
718         LASSERT(import_cachep == NULL);
719         import_cachep = kmem_cache_create("ll_import_cache",
720                                           sizeof(struct obd_import),
721                                           0, 0, NULL);
722         if (!import_cachep)
723                 GOTO(out, rc = -ENOMEM);
724
725         LASSERT(capa_cachep == NULL);
726         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
727                                         0, 0, NULL);
728         if (!capa_cachep)
729                 GOTO(out, rc = -ENOMEM);
730
731         RETURN(0);
732 out:
733         obd_cleanup_caches();
734         RETURN(rc);
735 }
736
737 /* map connection to client */
738 struct obd_export *class_conn2export(struct lustre_handle *conn)
739 {
740         struct obd_export *export;
741         ENTRY;
742
743         if (!conn) {
744                 CDEBUG(D_CACHE, "looking for null handle\n");
745                 RETURN(NULL);
746         }
747
748         if (conn->cookie == -1) {  /* this means assign a new connection */
749                 CDEBUG(D_CACHE, "want a new connection\n");
750                 RETURN(NULL);
751         }
752
753         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
754         export = class_handle2object(conn->cookie, NULL);
755         RETURN(export);
756 }
757 EXPORT_SYMBOL(class_conn2export);
758
759 struct obd_device *class_exp2obd(struct obd_export *exp)
760 {
761         if (exp)
762                 return exp->exp_obd;
763         return NULL;
764 }
765 EXPORT_SYMBOL(class_exp2obd);
766
767 struct obd_device *class_conn2obd(struct lustre_handle *conn)
768 {
769         struct obd_export *export;
770         export = class_conn2export(conn);
771         if (export) {
772                 struct obd_device *obd = export->exp_obd;
773                 class_export_put(export);
774                 return obd;
775         }
776         return NULL;
777 }
778 EXPORT_SYMBOL(class_conn2obd);
779
780 struct obd_import *class_exp2cliimp(struct obd_export *exp)
781 {
782         struct obd_device *obd = exp->exp_obd;
783         if (obd == NULL)
784                 return NULL;
785         return obd->u.cli.cl_import;
786 }
787 EXPORT_SYMBOL(class_exp2cliimp);
788
789 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
790 {
791         struct obd_device *obd = class_conn2obd(conn);
792         if (obd == NULL)
793                 return NULL;
794         return obd->u.cli.cl_import;
795 }
796 EXPORT_SYMBOL(class_conn2cliimp);
797
798 /* Export management functions */
799 static void class_export_destroy(struct obd_export *exp)
800 {
801         struct obd_device *obd = exp->exp_obd;
802         ENTRY;
803
804         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
805         LASSERT(obd != NULL);
806
807         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
808                exp->exp_client_uuid.uuid, obd->obd_name);
809
810         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
811         if (exp->exp_connection)
812                 ptlrpc_put_connection_superhack(exp->exp_connection);
813
814         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
815         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
816         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
817         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
818         obd_destroy_export(exp);
819         class_decref(obd, "export", exp);
820
821         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
822         EXIT;
823 }
824
825 static void export_handle_addref(void *export)
826 {
827         class_export_get(export);
828 }
829
830 static struct portals_handle_ops export_handle_ops = {
831         .hop_addref = export_handle_addref,
832         .hop_free   = NULL,
833 };
834
835 struct obd_export *class_export_get(struct obd_export *exp)
836 {
837         atomic_inc(&exp->exp_refcount);
838         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
839                atomic_read(&exp->exp_refcount));
840         return exp;
841 }
842 EXPORT_SYMBOL(class_export_get);
843
844 void class_export_put(struct obd_export *exp)
845 {
846         LASSERT(exp != NULL);
847         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
848         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
849                atomic_read(&exp->exp_refcount) - 1);
850
851         if (atomic_dec_and_test(&exp->exp_refcount)) {
852                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
853                 CDEBUG(D_IOCTL, "final put %p/%s\n",
854                        exp, exp->exp_client_uuid.uuid);
855
856                 /* release nid stat refererence */
857                 lprocfs_exp_cleanup(exp);
858
859                 obd_zombie_export_add(exp);
860         }
861 }
862 EXPORT_SYMBOL(class_export_put);
863
864 /* Creates a new export, adds it to the hash table, and returns a
865  * pointer to it. The refcount is 2: one for the hash reference, and
866  * one for the pointer returned by this function. */
867 struct obd_export *class_new_export(struct obd_device *obd,
868                                     struct obd_uuid *cluuid)
869 {
870         struct obd_export *export;
871         cfs_hash_t *hash = NULL;
872         int rc = 0;
873         ENTRY;
874
875         OBD_ALLOC_PTR(export);
876         if (!export)
877                 return ERR_PTR(-ENOMEM);
878
879         export->exp_conn_cnt = 0;
880         export->exp_lock_hash = NULL;
881         export->exp_flock_hash = NULL;
882         atomic_set(&export->exp_refcount, 2);
883         atomic_set(&export->exp_rpc_count, 0);
884         atomic_set(&export->exp_cb_count, 0);
885         atomic_set(&export->exp_locks_count, 0);
886 #if LUSTRE_TRACKS_LOCK_EXP_REFS
887         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
888         spin_lock_init(&export->exp_locks_list_guard);
889 #endif
890         atomic_set(&export->exp_replay_count, 0);
891         export->exp_obd = obd;
892         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
893         spin_lock_init(&export->exp_uncommitted_replies_lock);
894         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
895         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
896         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
897         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
898         CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
899         class_handle_hash(&export->exp_handle, &export_handle_ops);
900         export->exp_last_request_time = cfs_time_current_sec();
901         spin_lock_init(&export->exp_lock);
902         spin_lock_init(&export->exp_rpc_lock);
903         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
904         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
905         spin_lock_init(&export->exp_bl_list_lock);
906         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
907
908         export->exp_sp_peer = LUSTRE_SP_ANY;
909         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
910         export->exp_client_uuid = *cluuid;
911         obd_init_export(export);
912
913         spin_lock(&obd->obd_dev_lock);
914         /* shouldn't happen, but might race */
915         if (obd->obd_stopping)
916                 GOTO(exit_unlock, rc = -ENODEV);
917
918         hash = cfs_hash_getref(obd->obd_uuid_hash);
919         if (hash == NULL)
920                 GOTO(exit_unlock, rc = -ENODEV);
921         spin_unlock(&obd->obd_dev_lock);
922
923         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
924                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
925                 if (rc != 0) {
926                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
927                                       obd->obd_name, cluuid->uuid, rc);
928                         GOTO(exit_err, rc = -EALREADY);
929                 }
930         }
931
932         spin_lock(&obd->obd_dev_lock);
933         if (obd->obd_stopping) {
934                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
935                 GOTO(exit_unlock, rc = -ENODEV);
936         }
937
938         class_incref(obd, "export", export);
939         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
940         cfs_list_add_tail(&export->exp_obd_chain_timed,
941                           &export->exp_obd->obd_exports_timed);
942         export->exp_obd->obd_num_exports++;
943         spin_unlock(&obd->obd_dev_lock);
944         cfs_hash_putref(hash);
945         RETURN(export);
946
947 exit_unlock:
948         spin_unlock(&obd->obd_dev_lock);
949 exit_err:
950         if (hash)
951                 cfs_hash_putref(hash);
952         class_handle_unhash(&export->exp_handle);
953         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
954         obd_destroy_export(export);
955         OBD_FREE_PTR(export);
956         return ERR_PTR(rc);
957 }
958 EXPORT_SYMBOL(class_new_export);
959
960 void class_unlink_export(struct obd_export *exp)
961 {
962         class_handle_unhash(&exp->exp_handle);
963
964         spin_lock(&exp->exp_obd->obd_dev_lock);
965         /* delete an uuid-export hashitem from hashtables */
966         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
967                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
968                              &exp->exp_client_uuid,
969                              &exp->exp_uuid_hash);
970
971         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
972         cfs_list_del_init(&exp->exp_obd_chain_timed);
973         exp->exp_obd->obd_num_exports--;
974         spin_unlock(&exp->exp_obd->obd_dev_lock);
975         class_export_put(exp);
976 }
977 EXPORT_SYMBOL(class_unlink_export);
978
979 /* Import management functions */
980 void class_import_destroy(struct obd_import *imp)
981 {
982         ENTRY;
983
984         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
985                 imp->imp_obd->obd_name);
986
987         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
988
989         ptlrpc_put_connection_superhack(imp->imp_connection);
990
991         while (!cfs_list_empty(&imp->imp_conn_list)) {
992                 struct obd_import_conn *imp_conn;
993
994                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
995                                           struct obd_import_conn, oic_item);
996                 cfs_list_del_init(&imp_conn->oic_item);
997                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
998                 OBD_FREE(imp_conn, sizeof(*imp_conn));
999         }
1000
1001         LASSERT(imp->imp_sec == NULL);
1002         class_decref(imp->imp_obd, "import", imp);
1003         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1004         EXIT;
1005 }
1006
1007 static void import_handle_addref(void *import)
1008 {
1009         class_import_get(import);
1010 }
1011
1012 static struct portals_handle_ops import_handle_ops = {
1013         .hop_addref = import_handle_addref,
1014         .hop_free   = NULL,
1015 };
1016
1017 struct obd_import *class_import_get(struct obd_import *import)
1018 {
1019         atomic_inc(&import->imp_refcount);
1020         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1021                atomic_read(&import->imp_refcount),
1022                import->imp_obd->obd_name);
1023         return import;
1024 }
1025 EXPORT_SYMBOL(class_import_get);
1026
1027 void class_import_put(struct obd_import *imp)
1028 {
1029         ENTRY;
1030
1031         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1032         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1033
1034         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1035                atomic_read(&imp->imp_refcount) - 1,
1036                imp->imp_obd->obd_name);
1037
1038         if (atomic_dec_and_test(&imp->imp_refcount)) {
1039                 CDEBUG(D_INFO, "final put import %p\n", imp);
1040                 obd_zombie_import_add(imp);
1041         }
1042
1043         /* catch possible import put race */
1044         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1045         EXIT;
1046 }
1047 EXPORT_SYMBOL(class_import_put);
1048
1049 static void init_imp_at(struct imp_at *at) {
1050         int i;
1051         at_init(&at->iat_net_latency, 0, 0);
1052         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1053                 /* max service estimates are tracked on the server side, so
1054                    don't use the AT history here, just use the last reported
1055                    val. (But keep hist for proc histogram, worst_ever) */
1056                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1057                         AT_FLG_NOHIST);
1058         }
1059 }
1060
1061 struct obd_import *class_new_import(struct obd_device *obd)
1062 {
1063         struct obd_import *imp;
1064
1065         OBD_ALLOC(imp, sizeof(*imp));
1066         if (imp == NULL)
1067                 return NULL;
1068
1069         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1070         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1071         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1072         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1073         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1074         CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1075         imp->imp_replay_cursor = &imp->imp_committed_list;
1076         spin_lock_init(&imp->imp_lock);
1077         imp->imp_last_success_conn = 0;
1078         imp->imp_state = LUSTRE_IMP_NEW;
1079         imp->imp_obd = class_incref(obd, "import", imp);
1080         mutex_init(&imp->imp_sec_mutex);
1081         init_waitqueue_head(&imp->imp_recovery_waitq);
1082
1083         atomic_set(&imp->imp_refcount, 2);
1084         atomic_set(&imp->imp_unregistering, 0);
1085         atomic_set(&imp->imp_inflight, 0);
1086         atomic_set(&imp->imp_replay_inflight, 0);
1087         atomic_set(&imp->imp_inval_count, 0);
1088         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1089         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1090         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1091         init_imp_at(&imp->imp_at);
1092
1093         /* the default magic is V2, will be used in connect RPC, and
1094          * then adjusted according to the flags in request/reply. */
1095         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1096
1097         return imp;
1098 }
1099 EXPORT_SYMBOL(class_new_import);
1100
1101 void class_destroy_import(struct obd_import *import)
1102 {
1103         LASSERT(import != NULL);
1104         LASSERT(import != LP_POISON);
1105
1106         class_handle_unhash(&import->imp_handle);
1107
1108         spin_lock(&import->imp_lock);
1109         import->imp_generation++;
1110         spin_unlock(&import->imp_lock);
1111         class_import_put(import);
1112 }
1113 EXPORT_SYMBOL(class_destroy_import);
1114
1115 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1116
1117 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1118 {
1119         spin_lock(&exp->exp_locks_list_guard);
1120
1121         LASSERT(lock->l_exp_refs_nr >= 0);
1122
1123         if (lock->l_exp_refs_target != NULL &&
1124             lock->l_exp_refs_target != exp) {
1125                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1126                               exp, lock, lock->l_exp_refs_target);
1127         }
1128         if ((lock->l_exp_refs_nr ++) == 0) {
1129                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1130                 lock->l_exp_refs_target = exp;
1131         }
1132         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1133                lock, exp, lock->l_exp_refs_nr);
1134         spin_unlock(&exp->exp_locks_list_guard);
1135 }
1136 EXPORT_SYMBOL(__class_export_add_lock_ref);
1137
1138 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1139 {
1140         spin_lock(&exp->exp_locks_list_guard);
1141         LASSERT(lock->l_exp_refs_nr > 0);
1142         if (lock->l_exp_refs_target != exp) {
1143                 LCONSOLE_WARN("lock %p, "
1144                               "mismatching export pointers: %p, %p\n",
1145                               lock, lock->l_exp_refs_target, exp);
1146         }
1147         if (-- lock->l_exp_refs_nr == 0) {
1148                 cfs_list_del_init(&lock->l_exp_refs_link);
1149                 lock->l_exp_refs_target = NULL;
1150         }
1151         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1152                lock, exp, lock->l_exp_refs_nr);
1153         spin_unlock(&exp->exp_locks_list_guard);
1154 }
1155 EXPORT_SYMBOL(__class_export_del_lock_ref);
1156 #endif
1157
1158 /* A connection defines an export context in which preallocation can
1159    be managed. This releases the export pointer reference, and returns
1160    the export handle, so the export refcount is 1 when this function
1161    returns. */
1162 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1163                   struct obd_uuid *cluuid)
1164 {
1165         struct obd_export *export;
1166         LASSERT(conn != NULL);
1167         LASSERT(obd != NULL);
1168         LASSERT(cluuid != NULL);
1169         ENTRY;
1170
1171         export = class_new_export(obd, cluuid);
1172         if (IS_ERR(export))
1173                 RETURN(PTR_ERR(export));
1174
1175         conn->cookie = export->exp_handle.h_cookie;
1176         class_export_put(export);
1177
1178         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1179                cluuid->uuid, conn->cookie);
1180         RETURN(0);
1181 }
1182 EXPORT_SYMBOL(class_connect);
1183
1184 /* if export is involved in recovery then clean up related things */
1185 void class_export_recovery_cleanup(struct obd_export *exp)
1186 {
1187         struct obd_device *obd = exp->exp_obd;
1188
1189         spin_lock(&obd->obd_recovery_task_lock);
1190         if (obd->obd_recovering) {
1191                 if (exp->exp_in_recovery) {
1192                         spin_lock(&exp->exp_lock);
1193                         exp->exp_in_recovery = 0;
1194                         spin_unlock(&exp->exp_lock);
1195                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1196                         atomic_dec(&obd->obd_connected_clients);
1197                 }
1198
1199                 /* if called during recovery then should update
1200                  * obd_stale_clients counter,
1201                  * lightweight exports are not counted */
1202                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1203                         exp->exp_obd->obd_stale_clients++;
1204         }
1205         spin_unlock(&obd->obd_recovery_task_lock);
1206         /** Cleanup req replay fields */
1207         if (exp->exp_req_replay_needed) {
1208                 spin_lock(&exp->exp_lock);
1209                 exp->exp_req_replay_needed = 0;
1210                 spin_unlock(&exp->exp_lock);
1211                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1212                 atomic_dec(&obd->obd_req_replay_clients);
1213         }
1214         /** Cleanup lock replay data */
1215         if (exp->exp_lock_replay_needed) {
1216                 spin_lock(&exp->exp_lock);
1217                 exp->exp_lock_replay_needed = 0;
1218                 spin_unlock(&exp->exp_lock);
1219                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1220                 atomic_dec(&obd->obd_lock_replay_clients);
1221         }
1222 }
1223
1224 /* This function removes 1-3 references from the export:
1225  * 1 - for export pointer passed
1226  * and if disconnect really need
1227  * 2 - removing from hash
1228  * 3 - in client_unlink_export
1229  * The export pointer passed to this function can destroyed */
1230 int class_disconnect(struct obd_export *export)
1231 {
1232         int already_disconnected;
1233         ENTRY;
1234
1235         if (export == NULL) {
1236                 CWARN("attempting to free NULL export %p\n", export);
1237                 RETURN(-EINVAL);
1238         }
1239
1240         spin_lock(&export->exp_lock);
1241         already_disconnected = export->exp_disconnected;
1242         export->exp_disconnected = 1;
1243         spin_unlock(&export->exp_lock);
1244
1245         /* class_cleanup(), abort_recovery(), and class_fail_export()
1246          * all end up in here, and if any of them race we shouldn't
1247          * call extra class_export_puts(). */
1248         if (already_disconnected) {
1249                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1250                 GOTO(no_disconn, already_disconnected);
1251         }
1252
1253         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1254                export->exp_handle.h_cookie);
1255
1256         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1257                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1258                              &export->exp_connection->c_peer.nid,
1259                              &export->exp_nid_hash);
1260
1261         class_export_recovery_cleanup(export);
1262         class_unlink_export(export);
1263 no_disconn:
1264         class_export_put(export);
1265         RETURN(0);
1266 }
1267 EXPORT_SYMBOL(class_disconnect);
1268
1269 /* Return non-zero for a fully connected export */
1270 int class_connected_export(struct obd_export *exp)
1271 {
1272         int connected = 0;
1273
1274         if (exp) {
1275                 spin_lock(&exp->exp_lock);
1276                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1277                 spin_unlock(&exp->exp_lock);
1278         }
1279         return connected;
1280 }
1281 EXPORT_SYMBOL(class_connected_export);
1282
1283 static void class_disconnect_export_list(cfs_list_t *list,
1284                                          enum obd_option flags)
1285 {
1286         int rc;
1287         struct obd_export *exp;
1288         ENTRY;
1289
1290         /* It's possible that an export may disconnect itself, but
1291          * nothing else will be added to this list. */
1292         while (!cfs_list_empty(list)) {
1293                 exp = cfs_list_entry(list->next, struct obd_export,
1294                                      exp_obd_chain);
1295                 /* need for safe call CDEBUG after obd_disconnect */
1296                 class_export_get(exp);
1297
1298                 spin_lock(&exp->exp_lock);
1299                 exp->exp_flags = flags;
1300                 spin_unlock(&exp->exp_lock);
1301
1302                 if (obd_uuid_equals(&exp->exp_client_uuid,
1303                                     &exp->exp_obd->obd_uuid)) {
1304                         CDEBUG(D_HA,
1305                                "exp %p export uuid == obd uuid, don't discon\n",
1306                                exp);
1307                         /* Need to delete this now so we don't end up pointing
1308                          * to work_list later when this export is cleaned up. */
1309                         cfs_list_del_init(&exp->exp_obd_chain);
1310                         class_export_put(exp);
1311                         continue;
1312                 }
1313
1314                 class_export_get(exp);
1315                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1316                        "last request at "CFS_TIME_T"\n",
1317                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1318                        exp, exp->exp_last_request_time);
1319                 /* release one export reference anyway */
1320                 rc = obd_disconnect(exp);
1321
1322                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1323                        obd_export_nid2str(exp), exp, rc);
1324                 class_export_put(exp);
1325         }
1326         EXIT;
1327 }
1328
1329 void class_disconnect_exports(struct obd_device *obd)
1330 {
1331         cfs_list_t work_list;
1332         ENTRY;
1333
1334         /* Move all of the exports from obd_exports to a work list, en masse. */
1335         CFS_INIT_LIST_HEAD(&work_list);
1336         spin_lock(&obd->obd_dev_lock);
1337         cfs_list_splice_init(&obd->obd_exports, &work_list);
1338         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1339         spin_unlock(&obd->obd_dev_lock);
1340
1341         if (!cfs_list_empty(&work_list)) {
1342                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1343                        "disconnecting them\n", obd->obd_minor, obd);
1344                 class_disconnect_export_list(&work_list,
1345                                              exp_flags_from_obd(obd));
1346         } else
1347                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1348                        obd->obd_minor, obd);
1349         EXIT;
1350 }
1351 EXPORT_SYMBOL(class_disconnect_exports);
1352
1353 /* Remove exports that have not completed recovery.
1354  */
1355 void class_disconnect_stale_exports(struct obd_device *obd,
1356                                     int (*test_export)(struct obd_export *))
1357 {
1358         cfs_list_t work_list;
1359         struct obd_export *exp, *n;
1360         int evicted = 0;
1361         ENTRY;
1362
1363         CFS_INIT_LIST_HEAD(&work_list);
1364         spin_lock(&obd->obd_dev_lock);
1365         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1366                                      exp_obd_chain) {
1367                 /* don't count self-export as client */
1368                 if (obd_uuid_equals(&exp->exp_client_uuid,
1369                                     &exp->exp_obd->obd_uuid))
1370                         continue;
1371
1372                 /* don't evict clients which have no slot in last_rcvd
1373                  * (e.g. lightweight connection) */
1374                 if (exp->exp_target_data.ted_lr_idx == -1)
1375                         continue;
1376
1377                 spin_lock(&exp->exp_lock);
1378                 if (exp->exp_failed || test_export(exp)) {
1379                         spin_unlock(&exp->exp_lock);
1380                         continue;
1381                 }
1382                 exp->exp_failed = 1;
1383                 spin_unlock(&exp->exp_lock);
1384
1385                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1386                 evicted++;
1387                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1388                        obd->obd_name, exp->exp_client_uuid.uuid,
1389                        exp->exp_connection == NULL ? "<unknown>" :
1390                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1391                 print_export_data(exp, "EVICTING", 0);
1392         }
1393         spin_unlock(&obd->obd_dev_lock);
1394
1395         if (evicted)
1396                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1397                               obd->obd_name, evicted);
1398
1399         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1400                                                  OBD_OPT_ABORT_RECOV);
1401         EXIT;
1402 }
1403 EXPORT_SYMBOL(class_disconnect_stale_exports);
1404
1405 void class_fail_export(struct obd_export *exp)
1406 {
1407         int rc, already_failed;
1408
1409         spin_lock(&exp->exp_lock);
1410         already_failed = exp->exp_failed;
1411         exp->exp_failed = 1;
1412         spin_unlock(&exp->exp_lock);
1413
1414         if (already_failed) {
1415                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1416                        exp, exp->exp_client_uuid.uuid);
1417                 return;
1418         }
1419
1420         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1421                exp, exp->exp_client_uuid.uuid);
1422
1423         if (obd_dump_on_timeout)
1424                 libcfs_debug_dumplog();
1425
1426         /* need for safe call CDEBUG after obd_disconnect */
1427         class_export_get(exp);
1428
1429         /* Most callers into obd_disconnect are removing their own reference
1430          * (request, for example) in addition to the one from the hash table.
1431          * We don't have such a reference here, so make one. */
1432         class_export_get(exp);
1433         rc = obd_disconnect(exp);
1434         if (rc)
1435                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1436         else
1437                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1438                        exp, exp->exp_client_uuid.uuid);
1439         class_export_put(exp);
1440 }
1441 EXPORT_SYMBOL(class_fail_export);
1442
1443 char *obd_export_nid2str(struct obd_export *exp)
1444 {
1445         if (exp->exp_connection != NULL)
1446                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1447
1448         return "(no nid)";
1449 }
1450 EXPORT_SYMBOL(obd_export_nid2str);
1451
1452 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1453 {
1454         cfs_hash_t *nid_hash;
1455         struct obd_export *doomed_exp = NULL;
1456         int exports_evicted = 0;
1457
1458         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1459
1460         spin_lock(&obd->obd_dev_lock);
1461         /* umount has run already, so evict thread should leave
1462          * its task to umount thread now */
1463         if (obd->obd_stopping) {
1464                 spin_unlock(&obd->obd_dev_lock);
1465                 return exports_evicted;
1466         }
1467         nid_hash = obd->obd_nid_hash;
1468         cfs_hash_getref(nid_hash);
1469         spin_unlock(&obd->obd_dev_lock);
1470
1471         do {
1472                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1473                 if (doomed_exp == NULL)
1474                         break;
1475
1476                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1477                          "nid %s found, wanted nid %s, requested nid %s\n",
1478                          obd_export_nid2str(doomed_exp),
1479                          libcfs_nid2str(nid_key), nid);
1480                 LASSERTF(doomed_exp != obd->obd_self_export,
1481                          "self-export is hashed by NID?\n");
1482                 exports_evicted++;
1483                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1484                               "request\n", obd->obd_name,
1485                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1486                               obd_export_nid2str(doomed_exp));
1487                 class_fail_export(doomed_exp);
1488                 class_export_put(doomed_exp);
1489         } while (1);
1490
1491         cfs_hash_putref(nid_hash);
1492
1493         if (!exports_evicted)
1494                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1495                        obd->obd_name, nid);
1496         return exports_evicted;
1497 }
1498 EXPORT_SYMBOL(obd_export_evict_by_nid);
1499
1500 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1501 {
1502         cfs_hash_t *uuid_hash;
1503         struct obd_export *doomed_exp = NULL;
1504         struct obd_uuid doomed_uuid;
1505         int exports_evicted = 0;
1506
1507         spin_lock(&obd->obd_dev_lock);
1508         if (obd->obd_stopping) {
1509                 spin_unlock(&obd->obd_dev_lock);
1510                 return exports_evicted;
1511         }
1512         uuid_hash = obd->obd_uuid_hash;
1513         cfs_hash_getref(uuid_hash);
1514         spin_unlock(&obd->obd_dev_lock);
1515
1516         obd_str2uuid(&doomed_uuid, uuid);
1517         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1518                 CERROR("%s: can't evict myself\n", obd->obd_name);
1519                 cfs_hash_putref(uuid_hash);
1520                 return exports_evicted;
1521         }
1522
1523         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1524
1525         if (doomed_exp == NULL) {
1526                 CERROR("%s: can't disconnect %s: no exports found\n",
1527                        obd->obd_name, uuid);
1528         } else {
1529                 CWARN("%s: evicting %s at adminstrative request\n",
1530                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1531                 class_fail_export(doomed_exp);
1532                 class_export_put(doomed_exp);
1533                 exports_evicted++;
1534         }
1535         cfs_hash_putref(uuid_hash);
1536
1537         return exports_evicted;
1538 }
1539 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1540
1541 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1542 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1543 EXPORT_SYMBOL(class_export_dump_hook);
1544 #endif
1545
1546 static void print_export_data(struct obd_export *exp, const char *status,
1547                               int locks)
1548 {
1549         struct ptlrpc_reply_state *rs;
1550         struct ptlrpc_reply_state *first_reply = NULL;
1551         int nreplies = 0;
1552
1553         spin_lock(&exp->exp_lock);
1554         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1555                                 rs_exp_list) {
1556                 if (nreplies == 0)
1557                         first_reply = rs;
1558                 nreplies++;
1559         }
1560         spin_unlock(&exp->exp_lock);
1561
1562         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1563                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1564                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1565                atomic_read(&exp->exp_rpc_count),
1566                atomic_read(&exp->exp_cb_count),
1567                atomic_read(&exp->exp_locks_count),
1568                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1569                nreplies, first_reply, nreplies > 3 ? "..." : "",
1570                exp->exp_last_committed);
1571 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1572         if (locks && class_export_dump_hook != NULL)
1573                 class_export_dump_hook(exp);
1574 #endif
1575 }
1576
1577 void dump_exports(struct obd_device *obd, int locks)
1578 {
1579         struct obd_export *exp;
1580
1581         spin_lock(&obd->obd_dev_lock);
1582         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1583                 print_export_data(exp, "ACTIVE", locks);
1584         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1585                 print_export_data(exp, "UNLINKED", locks);
1586         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1587                 print_export_data(exp, "DELAYED", locks);
1588         spin_unlock(&obd->obd_dev_lock);
1589         spin_lock(&obd_zombie_impexp_lock);
1590         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1591                 print_export_data(exp, "ZOMBIE", locks);
1592         spin_unlock(&obd_zombie_impexp_lock);
1593 }
1594 EXPORT_SYMBOL(dump_exports);
1595
1596 void obd_exports_barrier(struct obd_device *obd)
1597 {
1598         int waited = 2;
1599         LASSERT(cfs_list_empty(&obd->obd_exports));
1600         spin_lock(&obd->obd_dev_lock);
1601         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1602                 spin_unlock(&obd->obd_dev_lock);
1603                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1604                                                    cfs_time_seconds(waited));
1605                 if (waited > 5 && IS_PO2(waited)) {
1606                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1607                                       "more than %d seconds. "
1608                                       "The obd refcount = %d. Is it stuck?\n",
1609                                       obd->obd_name, waited,
1610                                       atomic_read(&obd->obd_refcount));
1611                         dump_exports(obd, 1);
1612                 }
1613                 waited *= 2;
1614                 spin_lock(&obd->obd_dev_lock);
1615         }
1616         spin_unlock(&obd->obd_dev_lock);
1617 }
1618 EXPORT_SYMBOL(obd_exports_barrier);
1619
1620 /* Total amount of zombies to be destroyed */
1621 static int zombies_count = 0;
1622
1623 /**
1624  * kill zombie imports and exports
1625  */
1626 void obd_zombie_impexp_cull(void)
1627 {
1628         struct obd_import *import;
1629         struct obd_export *export;
1630         ENTRY;
1631
1632         do {
1633                 spin_lock(&obd_zombie_impexp_lock);
1634
1635                 import = NULL;
1636                 if (!cfs_list_empty(&obd_zombie_imports)) {
1637                         import = cfs_list_entry(obd_zombie_imports.next,
1638                                                 struct obd_import,
1639                                                 imp_zombie_chain);
1640                         cfs_list_del_init(&import->imp_zombie_chain);
1641                 }
1642
1643                 export = NULL;
1644                 if (!cfs_list_empty(&obd_zombie_exports)) {
1645                         export = cfs_list_entry(obd_zombie_exports.next,
1646                                                 struct obd_export,
1647                                                 exp_obd_chain);
1648                         cfs_list_del_init(&export->exp_obd_chain);
1649                 }
1650
1651                 spin_unlock(&obd_zombie_impexp_lock);
1652
1653                 if (import != NULL) {
1654                         class_import_destroy(import);
1655                         spin_lock(&obd_zombie_impexp_lock);
1656                         zombies_count--;
1657                         spin_unlock(&obd_zombie_impexp_lock);
1658                 }
1659
1660                 if (export != NULL) {
1661                         class_export_destroy(export);
1662                         spin_lock(&obd_zombie_impexp_lock);
1663                         zombies_count--;
1664                         spin_unlock(&obd_zombie_impexp_lock);
1665                 }
1666
1667                 cond_resched();
1668         } while (import != NULL || export != NULL);
1669         EXIT;
1670 }
1671
1672 static struct completion        obd_zombie_start;
1673 static struct completion        obd_zombie_stop;
1674 static unsigned long            obd_zombie_flags;
1675 static wait_queue_head_t        obd_zombie_waitq;
1676 static pid_t                    obd_zombie_pid;
1677
1678 enum {
1679         OBD_ZOMBIE_STOP         = 0x0001,
1680 };
1681
1682 /**
1683  * check for work for kill zombie import/export thread.
1684  */
1685 static int obd_zombie_impexp_check(void *arg)
1686 {
1687         int rc;
1688
1689         spin_lock(&obd_zombie_impexp_lock);
1690         rc = (zombies_count == 0) &&
1691              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1692         spin_unlock(&obd_zombie_impexp_lock);
1693
1694         RETURN(rc);
1695 }
1696
1697 /**
1698  * Add export to the obd_zombe thread and notify it.
1699  */
1700 static void obd_zombie_export_add(struct obd_export *exp) {
1701         spin_lock(&exp->exp_obd->obd_dev_lock);
1702         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1703         cfs_list_del_init(&exp->exp_obd_chain);
1704         spin_unlock(&exp->exp_obd->obd_dev_lock);
1705         spin_lock(&obd_zombie_impexp_lock);
1706         zombies_count++;
1707         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1708         spin_unlock(&obd_zombie_impexp_lock);
1709
1710         obd_zombie_impexp_notify();
1711 }
1712
1713 /**
1714  * Add import to the obd_zombe thread and notify it.
1715  */
1716 static void obd_zombie_import_add(struct obd_import *imp) {
1717         LASSERT(imp->imp_sec == NULL);
1718         LASSERT(imp->imp_rq_pool == NULL);
1719         spin_lock(&obd_zombie_impexp_lock);
1720         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1721         zombies_count++;
1722         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1723         spin_unlock(&obd_zombie_impexp_lock);
1724
1725         obd_zombie_impexp_notify();
1726 }
1727
1728 /**
1729  * notify import/export destroy thread about new zombie.
1730  */
1731 static void obd_zombie_impexp_notify(void)
1732 {
1733         /*
1734          * Make sure obd_zomebie_impexp_thread get this notification.
1735          * It is possible this signal only get by obd_zombie_barrier, and
1736          * barrier gulps this notification and sleeps away and hangs ensues
1737          */
1738         wake_up_all(&obd_zombie_waitq);
1739 }
1740
1741 /**
1742  * check whether obd_zombie is idle
1743  */
1744 static int obd_zombie_is_idle(void)
1745 {
1746         int rc;
1747
1748         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1749         spin_lock(&obd_zombie_impexp_lock);
1750         rc = (zombies_count == 0);
1751         spin_unlock(&obd_zombie_impexp_lock);
1752         return rc;
1753 }
1754
1755 /**
1756  * wait when obd_zombie import/export queues become empty
1757  */
1758 void obd_zombie_barrier(void)
1759 {
1760         struct l_wait_info lwi = { 0 };
1761
1762         if (obd_zombie_pid == current_pid())
1763                 /* don't wait for myself */
1764                 return;
1765         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1766 }
1767 EXPORT_SYMBOL(obd_zombie_barrier);
1768
1769 #ifdef __KERNEL__
1770
1771 /**
1772  * destroy zombie export/import thread.
1773  */
1774 static int obd_zombie_impexp_thread(void *unused)
1775 {
1776         unshare_fs_struct();
1777         complete(&obd_zombie_start);
1778
1779         obd_zombie_pid = current_pid();
1780
1781         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1782                 struct l_wait_info lwi = { 0 };
1783
1784                 l_wait_event(obd_zombie_waitq,
1785                              !obd_zombie_impexp_check(NULL), &lwi);
1786                 obd_zombie_impexp_cull();
1787
1788                 /*
1789                  * Notify obd_zombie_barrier callers that queues
1790                  * may be empty.
1791                  */
1792                 wake_up(&obd_zombie_waitq);
1793         }
1794
1795         complete(&obd_zombie_stop);
1796
1797         RETURN(0);
1798 }
1799
1800 #else /* ! KERNEL */
1801
1802 static atomic_t zombie_recur = ATOMIC_INIT(0);
1803 static void *obd_zombie_impexp_work_cb;
1804 static void *obd_zombie_impexp_idle_cb;
1805
1806 int obd_zombie_impexp_kill(void *arg)
1807 {
1808         int rc = 0;
1809
1810         if (atomic_inc_return(&zombie_recur) == 1) {
1811                 obd_zombie_impexp_cull();
1812                 rc = 1;
1813         }
1814         atomic_dec(&zombie_recur);
1815         return rc;
1816 }
1817
1818 #endif
1819
1820 /**
1821  * start destroy zombie import/export thread
1822  */
1823 int obd_zombie_impexp_init(void)
1824 {
1825 #ifdef __KERNEL__
1826         struct task_struct *task;
1827 #endif
1828
1829         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1830         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1831         spin_lock_init(&obd_zombie_impexp_lock);
1832         init_completion(&obd_zombie_start);
1833         init_completion(&obd_zombie_stop);
1834         init_waitqueue_head(&obd_zombie_waitq);
1835         obd_zombie_pid = 0;
1836
1837 #ifdef __KERNEL__
1838         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1839         if (IS_ERR(task))
1840                 RETURN(PTR_ERR(task));
1841
1842         wait_for_completion(&obd_zombie_start);
1843 #else
1844
1845         obd_zombie_impexp_work_cb =
1846                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1847                                                  &obd_zombie_impexp_kill, NULL);
1848
1849         obd_zombie_impexp_idle_cb =
1850                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1851                                                  &obd_zombie_impexp_check, NULL);
1852 #endif
1853         RETURN(0);
1854 }
1855 /**
1856  * stop destroy zombie import/export thread
1857  */
1858 void obd_zombie_impexp_stop(void)
1859 {
1860         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1861         obd_zombie_impexp_notify();
1862 #ifdef __KERNEL__
1863         wait_for_completion(&obd_zombie_stop);
1864 #else
1865         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1866         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1867 #endif
1868 }
1869
1870 /***** Kernel-userspace comm helpers *******/
1871
1872 /* Get length of entire message, including header */
1873 int kuc_len(int payload_len)
1874 {
1875         return sizeof(struct kuc_hdr) + payload_len;
1876 }
1877 EXPORT_SYMBOL(kuc_len);
1878
1879 /* Get a pointer to kuc header, given a ptr to the payload
1880  * @param p Pointer to payload area
1881  * @returns Pointer to kuc header
1882  */
1883 struct kuc_hdr * kuc_ptr(void *p)
1884 {
1885         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1886         LASSERT(lh->kuc_magic == KUC_MAGIC);
1887         return lh;
1888 }
1889 EXPORT_SYMBOL(kuc_ptr);
1890
1891 /* Test if payload is part of kuc message
1892  * @param p Pointer to payload area
1893  * @returns boolean
1894  */
1895 int kuc_ispayload(void *p)
1896 {
1897         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1898
1899         if (kh->kuc_magic == KUC_MAGIC)
1900                 return 1;
1901         else
1902                 return 0;
1903 }
1904 EXPORT_SYMBOL(kuc_ispayload);
1905
1906 /* Alloc space for a message, and fill in header
1907  * @return Pointer to payload area
1908  */
1909 void *kuc_alloc(int payload_len, int transport, int type)
1910 {
1911         struct kuc_hdr *lh;
1912         int len = kuc_len(payload_len);
1913
1914         OBD_ALLOC(lh, len);
1915         if (lh == NULL)
1916                 return ERR_PTR(-ENOMEM);
1917
1918         lh->kuc_magic = KUC_MAGIC;
1919         lh->kuc_transport = transport;
1920         lh->kuc_msgtype = type;
1921         lh->kuc_msglen = len;
1922
1923         return (void *)(lh + 1);
1924 }
1925 EXPORT_SYMBOL(kuc_alloc);
1926
1927 /* Takes pointer to payload area */
1928 inline void kuc_free(void *p, int payload_len)
1929 {
1930         struct kuc_hdr *lh = kuc_ptr(p);
1931         OBD_FREE(lh, kuc_len(payload_len));
1932 }
1933 EXPORT_SYMBOL(kuc_free);
1934
1935 struct obd_request_slot_waiter {
1936         struct list_head        orsw_entry;
1937         wait_queue_head_t       orsw_waitq;
1938         bool                    orsw_signaled;
1939 };
1940
1941 static bool obd_request_slot_avail(struct client_obd *cli,
1942                                    struct obd_request_slot_waiter *orsw)
1943 {
1944         bool avail;
1945
1946         client_obd_list_lock(&cli->cl_loi_list_lock);
1947         avail = !!list_empty(&orsw->orsw_entry);
1948         client_obd_list_unlock(&cli->cl_loi_list_lock);
1949
1950         return avail;
1951 };
1952
1953 /*
1954  * For network flow control, the RPC sponsor needs to acquire a credit
1955  * before sending the RPC. The credits count for a connection is defined
1956  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1957  * the subsequent RPC sponsors need to wait until others released their
1958  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1959  */
1960 int obd_get_request_slot(struct client_obd *cli)
1961 {
1962         struct obd_request_slot_waiter   orsw;
1963         struct l_wait_info               lwi;
1964         int                              rc;
1965
1966         client_obd_list_lock(&cli->cl_loi_list_lock);
1967         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1968                 cli->cl_r_in_flight++;
1969                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1970                 return 0;
1971         }
1972
1973         init_waitqueue_head(&orsw.orsw_waitq);
1974         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1975         orsw.orsw_signaled = false;
1976         client_obd_list_unlock(&cli->cl_loi_list_lock);
1977
1978         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1979         rc = l_wait_event(orsw.orsw_waitq,
1980                           obd_request_slot_avail(cli, &orsw) ||
1981                           orsw.orsw_signaled,
1982                           &lwi);
1983
1984         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1985          * freed but other (such as obd_put_request_slot) is using it. */
1986         client_obd_list_lock(&cli->cl_loi_list_lock);
1987         if (rc != 0) {
1988                 if (!orsw.orsw_signaled) {
1989                         if (list_empty(&orsw.orsw_entry))
1990                                 cli->cl_r_in_flight--;
1991                         else
1992                                 list_del(&orsw.orsw_entry);
1993                 }
1994         }
1995
1996         if (orsw.orsw_signaled) {
1997                 LASSERT(list_empty(&orsw.orsw_entry));
1998
1999                 rc = -EINTR;
2000         }
2001         client_obd_list_unlock(&cli->cl_loi_list_lock);
2002
2003         return rc;
2004 }
2005 EXPORT_SYMBOL(obd_get_request_slot);
2006
2007 void obd_put_request_slot(struct client_obd *cli)
2008 {
2009         struct obd_request_slot_waiter *orsw;
2010
2011         client_obd_list_lock(&cli->cl_loi_list_lock);
2012         cli->cl_r_in_flight--;
2013
2014         /* If there is free slot, wakeup the first waiter. */
2015         if (!list_empty(&cli->cl_loi_read_list) &&
2016             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2017                 orsw = list_entry(cli->cl_loi_read_list.next,
2018                                   struct obd_request_slot_waiter, orsw_entry);
2019                 list_del_init(&orsw->orsw_entry);
2020                 cli->cl_r_in_flight++;
2021                 wake_up(&orsw->orsw_waitq);
2022         }
2023         client_obd_list_unlock(&cli->cl_loi_list_lock);
2024 }
2025 EXPORT_SYMBOL(obd_put_request_slot);
2026
2027 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2028 {
2029         return cli->cl_max_rpcs_in_flight;
2030 }
2031 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2032
2033 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2034 {
2035         struct obd_request_slot_waiter *orsw;
2036         __u32                           old;
2037         int                             diff;
2038         int                             i;
2039
2040         if (max > OBD_MAX_RIF_MAX || max < 1)
2041                 return -ERANGE;
2042
2043         client_obd_list_lock(&cli->cl_loi_list_lock);
2044         old = cli->cl_max_rpcs_in_flight;
2045         cli->cl_max_rpcs_in_flight = max;
2046         diff = max - old;
2047
2048         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2049         for (i = 0; i < diff; i++) {
2050                 if (list_empty(&cli->cl_loi_read_list))
2051                         break;
2052
2053                 orsw = list_entry(cli->cl_loi_read_list.next,
2054                                   struct obd_request_slot_waiter, orsw_entry);
2055                 list_del_init(&orsw->orsw_entry);
2056                 cli->cl_r_in_flight++;
2057                 wake_up(&orsw->orsw_waitq);
2058         }
2059         client_obd_list_unlock(&cli->cl_loi_list_lock);
2060
2061         return 0;
2062 }
2063 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);