Whamcloud - gitweb
89f7c4b67e0bd28a75ab2838fca19145991caf9b
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
45 #include <lustre_kernelcomm.h>
46
47 spinlock_t obd_types_lock;
48
49 static struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 static struct kmem_cache *import_cachep;
53
54 static struct list_head obd_zombie_imports;
55 static struct list_head obd_zombie_exports;
56 static spinlock_t  obd_zombie_impexp_lock;
57
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62                               const char *status, int locks);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66
67 /*
68  * support functions: we could use inter-module communication, but this
69  * is more portable to other OS's
70  */
71 static struct obd_device *obd_device_alloc(void)
72 {
73         struct obd_device *obd;
74
75         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
76         if (obd != NULL) {
77                 obd->obd_magic = OBD_DEVICE_MAGIC;
78         }
79         return obd;
80 }
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112 EXPORT_SYMBOL(class_search_type);
113
114 struct obd_type *class_get_type(const char *name)
115 {
116         struct obd_type *type = class_search_type(name);
117
118 #ifdef HAVE_MODULE_LOADING_SUPPORT
119         if (!type) {
120                 const char *modname = name;
121
122                 if (strcmp(modname, "obdfilter") == 0)
123                         modname = "ofd";
124
125                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
126                         modname = LUSTRE_OSP_NAME;
127
128                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
129                         modname = LUSTRE_MDT_NAME;
130
131                 if (!request_module("%s", modname)) {
132                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
133                         type = class_search_type(name);
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
136                                            modname);
137                 }
138         }
139 #endif
140         if (type) {
141                 spin_lock(&type->obd_type_lock);
142                 type->typ_refcnt++;
143                 try_module_get(type->typ_dt_ops->o_owner);
144                 spin_unlock(&type->obd_type_lock);
145         }
146         return type;
147 }
148
149 void class_put_type(struct obd_type *type)
150 {
151         LASSERT(type);
152         spin_lock(&type->obd_type_lock);
153         type->typ_refcnt--;
154         module_put(type->typ_dt_ops->o_owner);
155         spin_unlock(&type->obd_type_lock);
156 }
157
158 #define CLASS_MAX_NAME 1024
159
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161                         bool enable_proc, struct lprocfs_vars *vars,
162                         const char *name, struct lu_device_type *ldt)
163 {
164         struct obd_type *type;
165         int rc = 0;
166         ENTRY;
167
168         /* sanity check */
169         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170
171         if (class_search_type(name)) {
172                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173                 RETURN(-EEXIST);
174         }
175
176         rc = -ENOMEM;
177         OBD_ALLOC(type, sizeof(*type));
178         if (type == NULL)
179                 RETURN(rc);
180
181         OBD_ALLOC_PTR(type->typ_dt_ops);
182         OBD_ALLOC_PTR(type->typ_md_ops);
183         OBD_ALLOC(type->typ_name, strlen(name) + 1);
184
185         if (type->typ_dt_ops == NULL ||
186             type->typ_md_ops == NULL ||
187             type->typ_name == NULL)
188                 GOTO (failed, rc);
189
190         *(type->typ_dt_ops) = *dt_ops;
191         /* md_ops is optional */
192         if (md_ops)
193                 *(type->typ_md_ops) = *md_ops;
194         strcpy(type->typ_name, name);
195         spin_lock_init(&type->obd_type_lock);
196
197 #ifdef CONFIG_PROC_FS
198         if (enable_proc) {
199                 type->typ_procroot = lprocfs_register(type->typ_name,
200                                                       proc_lustre_root,
201                                                       vars, type);
202                 if (IS_ERR(type->typ_procroot)) {
203                         rc = PTR_ERR(type->typ_procroot);
204                         type->typ_procroot = NULL;
205                         GOTO(failed, rc);
206                 }
207         }
208 #endif
209         if (ldt != NULL) {
210                 type->typ_lu = ldt;
211                 rc = lu_device_type_init(ldt);
212                 if (rc != 0)
213                         GOTO (failed, rc);
214         }
215
216         spin_lock(&obd_types_lock);
217         list_add(&type->typ_chain, &obd_types);
218         spin_unlock(&obd_types_lock);
219
220         RETURN (0);
221
222 failed:
223         if (type->typ_name != NULL) {
224 #ifdef CONFIG_PROC_FS
225                 if (type->typ_procroot != NULL)
226                         remove_proc_subtree(type->typ_name, proc_lustre_root);
227 #endif
228                 OBD_FREE(type->typ_name, strlen(name) + 1);
229         }
230         if (type->typ_md_ops != NULL)
231                 OBD_FREE_PTR(type->typ_md_ops);
232         if (type->typ_dt_ops != NULL)
233                 OBD_FREE_PTR(type->typ_dt_ops);
234         OBD_FREE(type, sizeof(*type));
235         RETURN(rc);
236 }
237 EXPORT_SYMBOL(class_register_type);
238
239 int class_unregister_type(const char *name)
240 {
241         struct obd_type *type = class_search_type(name);
242         ENTRY;
243
244         if (!type) {
245                 CERROR("unknown obd type\n");
246                 RETURN(-EINVAL);
247         }
248
249         if (type->typ_refcnt) {
250                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
251                 /* This is a bad situation, let's make the best of it */
252                 /* Remove ops, but leave the name for debugging */
253                 OBD_FREE_PTR(type->typ_dt_ops);
254                 OBD_FREE_PTR(type->typ_md_ops);
255                 RETURN(-EBUSY);
256         }
257
258         /* we do not use type->typ_procroot as for compatibility purposes
259          * other modules can share names (i.e. lod can use lov entry). so
260          * we can't reference pointer as it can get invalided when another
261          * module removes the entry */
262 #ifdef CONFIG_PROC_FS
263         if (type->typ_procroot != NULL)
264                 remove_proc_subtree(type->typ_name, proc_lustre_root);
265         if (type->typ_procsym != NULL)
266                 lprocfs_remove(&type->typ_procsym);
267 #endif
268         if (type->typ_lu)
269                 lu_device_type_fini(type->typ_lu);
270
271         spin_lock(&obd_types_lock);
272         list_del(&type->typ_chain);
273         spin_unlock(&obd_types_lock);
274         OBD_FREE(type->typ_name, strlen(name) + 1);
275         if (type->typ_dt_ops != NULL)
276                 OBD_FREE_PTR(type->typ_dt_ops);
277         if (type->typ_md_ops != NULL)
278                 OBD_FREE_PTR(type->typ_md_ops);
279         OBD_FREE(type, sizeof(*type));
280         RETURN(0);
281 } /* class_unregister_type */
282 EXPORT_SYMBOL(class_unregister_type);
283
284 /**
285  * Create a new obd device.
286  *
287  * Find an empty slot in ::obd_devs[], create a new obd device in it.
288  *
289  * \param[in] type_name obd device type string.
290  * \param[in] name      obd device name.
291  *
292  * \retval NULL if create fails, otherwise return the obd device
293  *         pointer created.
294  */
295 struct obd_device *class_newdev(const char *type_name, const char *name)
296 {
297         struct obd_device *result = NULL;
298         struct obd_device *newdev;
299         struct obd_type *type = NULL;
300         int i;
301         int new_obd_minor = 0;
302         ENTRY;
303
304         if (strlen(name) >= MAX_OBD_NAME) {
305                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
306                 RETURN(ERR_PTR(-EINVAL));
307         }
308
309         type = class_get_type(type_name);
310         if (type == NULL){
311                 CERROR("OBD: unknown type: %s\n", type_name);
312                 RETURN(ERR_PTR(-ENODEV));
313         }
314
315         newdev = obd_device_alloc();
316         if (newdev == NULL)
317                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
318
319         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
320
321         write_lock(&obd_dev_lock);
322         for (i = 0; i < class_devno_max(); i++) {
323                 struct obd_device *obd = class_num2obd(i);
324
325                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
326                         CERROR("Device %s already exists at %d, won't add\n",
327                                name, i);
328                         if (result) {
329                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
330                                          "%p obd_magic %08x != %08x\n", result,
331                                          result->obd_magic, OBD_DEVICE_MAGIC);
332                                 LASSERTF(result->obd_minor == new_obd_minor,
333                                          "%p obd_minor %d != %d\n", result,
334                                          result->obd_minor, new_obd_minor);
335
336                                 obd_devs[result->obd_minor] = NULL;
337                                 result->obd_name[0]='\0';
338                          }
339                         result = ERR_PTR(-EEXIST);
340                         break;
341                 }
342                 if (!result && !obd) {
343                         result = newdev;
344                         result->obd_minor = i;
345                         new_obd_minor = i;
346                         result->obd_type = type;
347                         strncpy(result->obd_name, name,
348                                 sizeof(result->obd_name) - 1);
349                         obd_devs[i] = result;
350                 }
351         }
352         write_unlock(&obd_dev_lock);
353
354         if (result == NULL && i >= class_devno_max()) {
355                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
356                        class_devno_max());
357                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
358         }
359
360         if (IS_ERR(result))
361                 GOTO(out, result);
362
363         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
364                result->obd_name, result);
365
366         RETURN(result);
367 out:
368         obd_device_free(newdev);
369 out_type:
370         class_put_type(type);
371         return result;
372 }
373
374 void class_release_dev(struct obd_device *obd)
375 {
376         struct obd_type *obd_type = obd->obd_type;
377
378         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
379                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
380         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
381                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
382         LASSERT(obd_type != NULL);
383
384         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
385                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
386
387         write_lock(&obd_dev_lock);
388         obd_devs[obd->obd_minor] = NULL;
389         write_unlock(&obd_dev_lock);
390         obd_device_free(obd);
391
392         class_put_type(obd_type);
393 }
394
395 int class_name2dev(const char *name)
396 {
397         int i;
398
399         if (!name)
400                 return -1;
401
402         read_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405
406                 if (obd && strcmp(name, obd->obd_name) == 0) {
407                         /* Make sure we finished attaching before we give
408                            out any references */
409                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410                         if (obd->obd_attached) {
411                                 read_unlock(&obd_dev_lock);
412                                 return i;
413                         }
414                         break;
415                 }
416         }
417         read_unlock(&obd_dev_lock);
418
419         return -1;
420 }
421
422 struct obd_device *class_name2obd(const char *name)
423 {
424         int dev = class_name2dev(name);
425
426         if (dev < 0 || dev > class_devno_max())
427                 return NULL;
428         return class_num2obd(dev);
429 }
430 EXPORT_SYMBOL(class_name2obd);
431
432 int class_uuid2dev(struct obd_uuid *uuid)
433 {
434         int i;
435
436         read_lock(&obd_dev_lock);
437         for (i = 0; i < class_devno_max(); i++) {
438                 struct obd_device *obd = class_num2obd(i);
439
440                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
441                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
442                         read_unlock(&obd_dev_lock);
443                         return i;
444                 }
445         }
446         read_unlock(&obd_dev_lock);
447
448         return -1;
449 }
450
451 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
452 {
453         int dev = class_uuid2dev(uuid);
454         if (dev < 0)
455                 return NULL;
456         return class_num2obd(dev);
457 }
458 EXPORT_SYMBOL(class_uuid2obd);
459
460 /**
461  * Get obd device from ::obd_devs[]
462  *
463  * \param num [in] array index
464  *
465  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
466  *         otherwise return the obd device there.
467  */
468 struct obd_device *class_num2obd(int num)
469 {
470         struct obd_device *obd = NULL;
471
472         if (num < class_devno_max()) {
473                 obd = obd_devs[num];
474                 if (obd == NULL)
475                         return NULL;
476
477                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
478                          "%p obd_magic %08x != %08x\n",
479                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
480                 LASSERTF(obd->obd_minor == num,
481                          "%p obd_minor %0d != %0d\n",
482                          obd, obd->obd_minor, num);
483         }
484
485         return obd;
486 }
487
488 /**
489  * Get obd devices count. Device in any
490  *    state are counted
491  * \retval obd device count
492  */
493 int get_devices_count(void)
494 {
495         int index, max_index = class_devno_max(), dev_count = 0;
496
497         read_lock(&obd_dev_lock);
498         for (index = 0; index <= max_index; index++) {
499                 struct obd_device *obd = class_num2obd(index);
500                 if (obd != NULL)
501                         dev_count++;
502         }
503         read_unlock(&obd_dev_lock);
504
505         return dev_count;
506 }
507 EXPORT_SYMBOL(get_devices_count);
508
509 void class_obd_list(void)
510 {
511         char *status;
512         int i;
513
514         read_lock(&obd_dev_lock);
515         for (i = 0; i < class_devno_max(); i++) {
516                 struct obd_device *obd = class_num2obd(i);
517
518                 if (obd == NULL)
519                         continue;
520                 if (obd->obd_stopping)
521                         status = "ST";
522                 else if (obd->obd_set_up)
523                         status = "UP";
524                 else if (obd->obd_attached)
525                         status = "AT";
526                 else
527                         status = "--";
528                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
529                          i, status, obd->obd_type->typ_name,
530                          obd->obd_name, obd->obd_uuid.uuid,
531                          atomic_read(&obd->obd_refcount));
532         }
533         read_unlock(&obd_dev_lock);
534         return;
535 }
536
537 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
538    specified, then only the client with that uuid is returned,
539    otherwise any client connected to the tgt is returned. */
540 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
541                                           const char * typ_name,
542                                           struct obd_uuid *grp_uuid)
543 {
544         int i;
545
546         read_lock(&obd_dev_lock);
547         for (i = 0; i < class_devno_max(); i++) {
548                 struct obd_device *obd = class_num2obd(i);
549
550                 if (obd == NULL)
551                         continue;
552                 if ((strncmp(obd->obd_type->typ_name, typ_name,
553                              strlen(typ_name)) == 0)) {
554                         if (obd_uuid_equals(tgt_uuid,
555                                             &obd->u.cli.cl_target_uuid) &&
556                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
557                                                          &obd->obd_uuid) : 1)) {
558                                 read_unlock(&obd_dev_lock);
559                                 return obd;
560                         }
561                 }
562         }
563         read_unlock(&obd_dev_lock);
564
565         return NULL;
566 }
567 EXPORT_SYMBOL(class_find_client_obd);
568
569 /* Iterate the obd_device list looking devices have grp_uuid. Start
570    searching at *next, and if a device is found, the next index to look
571    at is saved in *next. If next is NULL, then the first matching device
572    will always be returned. */
573 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
574 {
575         int i;
576
577         if (next == NULL)
578                 i = 0;
579         else if (*next >= 0 && *next < class_devno_max())
580                 i = *next;
581         else
582                 return NULL;
583
584         read_lock(&obd_dev_lock);
585         for (; i < class_devno_max(); i++) {
586                 struct obd_device *obd = class_num2obd(i);
587
588                 if (obd == NULL)
589                         continue;
590                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
591                         if (next != NULL)
592                                 *next = i+1;
593                         read_unlock(&obd_dev_lock);
594                         return obd;
595                 }
596         }
597         read_unlock(&obd_dev_lock);
598
599         return NULL;
600 }
601 EXPORT_SYMBOL(class_devices_in_group);
602
603 /**
604  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
605  * adjust sptlrpc settings accordingly.
606  */
607 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
608 {
609         struct obd_device  *obd;
610         const char         *type;
611         int                 i, rc = 0, rc2;
612
613         LASSERT(namelen > 0);
614
615         read_lock(&obd_dev_lock);
616         for (i = 0; i < class_devno_max(); i++) {
617                 obd = class_num2obd(i);
618
619                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
620                         continue;
621
622                 /* only notify mdc, osc, mdt, ost */
623                 type = obd->obd_type->typ_name;
624                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
625                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
626                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
627                     strcmp(type, LUSTRE_OST_NAME) != 0)
628                         continue;
629
630                 if (strncmp(obd->obd_name, fsname, namelen))
631                         continue;
632
633                 class_incref(obd, __FUNCTION__, obd);
634                 read_unlock(&obd_dev_lock);
635                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
636                                          sizeof(KEY_SPTLRPC_CONF),
637                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
638                 rc = rc ? rc : rc2;
639                 class_decref(obd, __FUNCTION__, obd);
640                 read_lock(&obd_dev_lock);
641         }
642         read_unlock(&obd_dev_lock);
643         return rc;
644 }
645 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
646
647 void obd_cleanup_caches(void)
648 {
649         ENTRY;
650         if (obd_device_cachep) {
651                 kmem_cache_destroy(obd_device_cachep);
652                 obd_device_cachep = NULL;
653         }
654         if (obdo_cachep) {
655                 kmem_cache_destroy(obdo_cachep);
656                 obdo_cachep = NULL;
657         }
658         if (import_cachep) {
659                 kmem_cache_destroy(import_cachep);
660                 import_cachep = NULL;
661         }
662         if (capa_cachep) {
663                 kmem_cache_destroy(capa_cachep);
664                 capa_cachep = NULL;
665         }
666         EXIT;
667 }
668
669 int obd_init_caches(void)
670 {
671         int rc;
672         ENTRY;
673
674         LASSERT(obd_device_cachep == NULL);
675         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
676                                               sizeof(struct obd_device),
677                                               0, 0, NULL);
678         if (!obd_device_cachep)
679                 GOTO(out, rc = -ENOMEM);
680
681         LASSERT(obdo_cachep == NULL);
682         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
683                                         0, 0, NULL);
684         if (!obdo_cachep)
685                 GOTO(out, rc = -ENOMEM);
686
687         LASSERT(import_cachep == NULL);
688         import_cachep = kmem_cache_create("ll_import_cache",
689                                           sizeof(struct obd_import),
690                                           0, 0, NULL);
691         if (!import_cachep)
692                 GOTO(out, rc = -ENOMEM);
693
694         LASSERT(capa_cachep == NULL);
695         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
696                                         0, 0, NULL);
697         if (!capa_cachep)
698                 GOTO(out, rc = -ENOMEM);
699
700         RETURN(0);
701 out:
702         obd_cleanup_caches();
703         RETURN(rc);
704 }
705
706 /* map connection to client */
707 struct obd_export *class_conn2export(struct lustre_handle *conn)
708 {
709         struct obd_export *export;
710         ENTRY;
711
712         if (!conn) {
713                 CDEBUG(D_CACHE, "looking for null handle\n");
714                 RETURN(NULL);
715         }
716
717         if (conn->cookie == -1) {  /* this means assign a new connection */
718                 CDEBUG(D_CACHE, "want a new connection\n");
719                 RETURN(NULL);
720         }
721
722         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
723         export = class_handle2object(conn->cookie, NULL);
724         RETURN(export);
725 }
726 EXPORT_SYMBOL(class_conn2export);
727
728 struct obd_device *class_exp2obd(struct obd_export *exp)
729 {
730         if (exp)
731                 return exp->exp_obd;
732         return NULL;
733 }
734 EXPORT_SYMBOL(class_exp2obd);
735
736 struct obd_device *class_conn2obd(struct lustre_handle *conn)
737 {
738         struct obd_export *export;
739         export = class_conn2export(conn);
740         if (export) {
741                 struct obd_device *obd = export->exp_obd;
742                 class_export_put(export);
743                 return obd;
744         }
745         return NULL;
746 }
747
748 struct obd_import *class_exp2cliimp(struct obd_export *exp)
749 {
750         struct obd_device *obd = exp->exp_obd;
751         if (obd == NULL)
752                 return NULL;
753         return obd->u.cli.cl_import;
754 }
755 EXPORT_SYMBOL(class_exp2cliimp);
756
757 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
758 {
759         struct obd_device *obd = class_conn2obd(conn);
760         if (obd == NULL)
761                 return NULL;
762         return obd->u.cli.cl_import;
763 }
764
765 /* Export management functions */
766 static void class_export_destroy(struct obd_export *exp)
767 {
768         struct obd_device *obd = exp->exp_obd;
769         ENTRY;
770
771         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
772         LASSERT(obd != NULL);
773
774         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
775                exp->exp_client_uuid.uuid, obd->obd_name);
776
777         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
778         if (exp->exp_connection)
779                 ptlrpc_put_connection_superhack(exp->exp_connection);
780
781         LASSERT(list_empty(&exp->exp_outstanding_replies));
782         LASSERT(list_empty(&exp->exp_uncommitted_replies));
783         LASSERT(list_empty(&exp->exp_req_replay_queue));
784         LASSERT(list_empty(&exp->exp_hp_rpcs));
785         obd_destroy_export(exp);
786         class_decref(obd, "export", exp);
787
788         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
789         EXIT;
790 }
791
792 static void export_handle_addref(void *export)
793 {
794         class_export_get(export);
795 }
796
797 static struct portals_handle_ops export_handle_ops = {
798         .hop_addref = export_handle_addref,
799         .hop_free   = NULL,
800 };
801
802 struct obd_export *class_export_get(struct obd_export *exp)
803 {
804         atomic_inc(&exp->exp_refcount);
805         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
806                atomic_read(&exp->exp_refcount));
807         return exp;
808 }
809 EXPORT_SYMBOL(class_export_get);
810
811 void class_export_put(struct obd_export *exp)
812 {
813         LASSERT(exp != NULL);
814         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
815         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
816                atomic_read(&exp->exp_refcount) - 1);
817
818         if (atomic_dec_and_test(&exp->exp_refcount)) {
819                 LASSERT(!list_empty(&exp->exp_obd_chain));
820                 CDEBUG(D_IOCTL, "final put %p/%s\n",
821                        exp, exp->exp_client_uuid.uuid);
822
823                 /* release nid stat refererence */
824                 lprocfs_exp_cleanup(exp);
825
826                 obd_zombie_export_add(exp);
827         }
828 }
829 EXPORT_SYMBOL(class_export_put);
830
831 /* Creates a new export, adds it to the hash table, and returns a
832  * pointer to it. The refcount is 2: one for the hash reference, and
833  * one for the pointer returned by this function. */
834 struct obd_export *class_new_export(struct obd_device *obd,
835                                     struct obd_uuid *cluuid)
836 {
837         struct obd_export *export;
838         cfs_hash_t *hash = NULL;
839         int rc = 0;
840         ENTRY;
841
842         OBD_ALLOC_PTR(export);
843         if (!export)
844                 return ERR_PTR(-ENOMEM);
845
846         export->exp_conn_cnt = 0;
847         export->exp_lock_hash = NULL;
848         export->exp_flock_hash = NULL;
849         atomic_set(&export->exp_refcount, 2);
850         atomic_set(&export->exp_rpc_count, 0);
851         atomic_set(&export->exp_cb_count, 0);
852         atomic_set(&export->exp_locks_count, 0);
853 #if LUSTRE_TRACKS_LOCK_EXP_REFS
854         INIT_LIST_HEAD(&export->exp_locks_list);
855         spin_lock_init(&export->exp_locks_list_guard);
856 #endif
857         atomic_set(&export->exp_replay_count, 0);
858         export->exp_obd = obd;
859         INIT_LIST_HEAD(&export->exp_outstanding_replies);
860         spin_lock_init(&export->exp_uncommitted_replies_lock);
861         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
862         INIT_LIST_HEAD(&export->exp_req_replay_queue);
863         INIT_LIST_HEAD(&export->exp_handle.h_link);
864         INIT_LIST_HEAD(&export->exp_hp_rpcs);
865         INIT_LIST_HEAD(&export->exp_reg_rpcs);
866         class_handle_hash(&export->exp_handle, &export_handle_ops);
867         export->exp_last_request_time = cfs_time_current_sec();
868         spin_lock_init(&export->exp_lock);
869         spin_lock_init(&export->exp_rpc_lock);
870         INIT_HLIST_NODE(&export->exp_uuid_hash);
871         INIT_HLIST_NODE(&export->exp_nid_hash);
872         spin_lock_init(&export->exp_bl_list_lock);
873         INIT_LIST_HEAD(&export->exp_bl_list);
874
875         export->exp_sp_peer = LUSTRE_SP_ANY;
876         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
877         export->exp_client_uuid = *cluuid;
878         obd_init_export(export);
879
880         spin_lock(&obd->obd_dev_lock);
881         /* shouldn't happen, but might race */
882         if (obd->obd_stopping)
883                 GOTO(exit_unlock, rc = -ENODEV);
884
885         hash = cfs_hash_getref(obd->obd_uuid_hash);
886         if (hash == NULL)
887                 GOTO(exit_unlock, rc = -ENODEV);
888         spin_unlock(&obd->obd_dev_lock);
889
890         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
891                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
892                 if (rc != 0) {
893                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
894                                       obd->obd_name, cluuid->uuid, rc);
895                         GOTO(exit_err, rc = -EALREADY);
896                 }
897         }
898
899         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
900         spin_lock(&obd->obd_dev_lock);
901         if (obd->obd_stopping) {
902                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
903                 GOTO(exit_unlock, rc = -ENODEV);
904         }
905
906         class_incref(obd, "export", export);
907         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
908         list_add_tail(&export->exp_obd_chain_timed,
909                       &export->exp_obd->obd_exports_timed);
910         export->exp_obd->obd_num_exports++;
911         spin_unlock(&obd->obd_dev_lock);
912         cfs_hash_putref(hash);
913         RETURN(export);
914
915 exit_unlock:
916         spin_unlock(&obd->obd_dev_lock);
917 exit_err:
918         if (hash)
919                 cfs_hash_putref(hash);
920         class_handle_unhash(&export->exp_handle);
921         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
922         obd_destroy_export(export);
923         OBD_FREE_PTR(export);
924         return ERR_PTR(rc);
925 }
926 EXPORT_SYMBOL(class_new_export);
927
928 void class_unlink_export(struct obd_export *exp)
929 {
930         class_handle_unhash(&exp->exp_handle);
931
932         spin_lock(&exp->exp_obd->obd_dev_lock);
933         /* delete an uuid-export hashitem from hashtables */
934         if (!hlist_unhashed(&exp->exp_uuid_hash))
935                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
936                              &exp->exp_client_uuid,
937                              &exp->exp_uuid_hash);
938
939         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
940         list_del_init(&exp->exp_obd_chain_timed);
941         exp->exp_obd->obd_num_exports--;
942         spin_unlock(&exp->exp_obd->obd_dev_lock);
943         class_export_put(exp);
944 }
945
946 /* Import management functions */
947 static void class_import_destroy(struct obd_import *imp)
948 {
949         ENTRY;
950
951         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
952                 imp->imp_obd->obd_name);
953
954         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
955
956         ptlrpc_put_connection_superhack(imp->imp_connection);
957
958         while (!list_empty(&imp->imp_conn_list)) {
959                 struct obd_import_conn *imp_conn;
960
961                 imp_conn = list_entry(imp->imp_conn_list.next,
962                                       struct obd_import_conn, oic_item);
963                 list_del_init(&imp_conn->oic_item);
964                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
965                 OBD_FREE(imp_conn, sizeof(*imp_conn));
966         }
967
968         LASSERT(imp->imp_sec == NULL);
969         class_decref(imp->imp_obd, "import", imp);
970         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
971         EXIT;
972 }
973
974 static void import_handle_addref(void *import)
975 {
976         class_import_get(import);
977 }
978
979 static struct portals_handle_ops import_handle_ops = {
980         .hop_addref = import_handle_addref,
981         .hop_free   = NULL,
982 };
983
984 struct obd_import *class_import_get(struct obd_import *import)
985 {
986         atomic_inc(&import->imp_refcount);
987         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
988                atomic_read(&import->imp_refcount),
989                import->imp_obd->obd_name);
990         return import;
991 }
992 EXPORT_SYMBOL(class_import_get);
993
994 void class_import_put(struct obd_import *imp)
995 {
996         ENTRY;
997
998         LASSERT(list_empty(&imp->imp_zombie_chain));
999         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1000
1001         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1002                atomic_read(&imp->imp_refcount) - 1,
1003                imp->imp_obd->obd_name);
1004
1005         if (atomic_dec_and_test(&imp->imp_refcount)) {
1006                 CDEBUG(D_INFO, "final put import %p\n", imp);
1007                 obd_zombie_import_add(imp);
1008         }
1009
1010         /* catch possible import put race */
1011         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1012         EXIT;
1013 }
1014 EXPORT_SYMBOL(class_import_put);
1015
1016 static void init_imp_at(struct imp_at *at) {
1017         int i;
1018         at_init(&at->iat_net_latency, 0, 0);
1019         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1020                 /* max service estimates are tracked on the server side, so
1021                    don't use the AT history here, just use the last reported
1022                    val. (But keep hist for proc histogram, worst_ever) */
1023                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1024                         AT_FLG_NOHIST);
1025         }
1026 }
1027
1028 struct obd_import *class_new_import(struct obd_device *obd)
1029 {
1030         struct obd_import *imp;
1031
1032         OBD_ALLOC(imp, sizeof(*imp));
1033         if (imp == NULL)
1034                 return NULL;
1035
1036         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1037         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1038         INIT_LIST_HEAD(&imp->imp_replay_list);
1039         INIT_LIST_HEAD(&imp->imp_sending_list);
1040         INIT_LIST_HEAD(&imp->imp_delayed_list);
1041         INIT_LIST_HEAD(&imp->imp_committed_list);
1042         imp->imp_replay_cursor = &imp->imp_committed_list;
1043         spin_lock_init(&imp->imp_lock);
1044         imp->imp_last_success_conn = 0;
1045         imp->imp_state = LUSTRE_IMP_NEW;
1046         imp->imp_obd = class_incref(obd, "import", imp);
1047         mutex_init(&imp->imp_sec_mutex);
1048         init_waitqueue_head(&imp->imp_recovery_waitq);
1049
1050         atomic_set(&imp->imp_refcount, 2);
1051         atomic_set(&imp->imp_unregistering, 0);
1052         atomic_set(&imp->imp_inflight, 0);
1053         atomic_set(&imp->imp_replay_inflight, 0);
1054         atomic_set(&imp->imp_inval_count, 0);
1055         INIT_LIST_HEAD(&imp->imp_conn_list);
1056         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1057         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1058         init_imp_at(&imp->imp_at);
1059
1060         /* the default magic is V2, will be used in connect RPC, and
1061          * then adjusted according to the flags in request/reply. */
1062         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1063
1064         return imp;
1065 }
1066 EXPORT_SYMBOL(class_new_import);
1067
1068 void class_destroy_import(struct obd_import *import)
1069 {
1070         LASSERT(import != NULL);
1071         LASSERT(import != LP_POISON);
1072
1073         class_handle_unhash(&import->imp_handle);
1074
1075         spin_lock(&import->imp_lock);
1076         import->imp_generation++;
1077         spin_unlock(&import->imp_lock);
1078         class_import_put(import);
1079 }
1080 EXPORT_SYMBOL(class_destroy_import);
1081
1082 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1083
1084 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1085 {
1086         spin_lock(&exp->exp_locks_list_guard);
1087
1088         LASSERT(lock->l_exp_refs_nr >= 0);
1089
1090         if (lock->l_exp_refs_target != NULL &&
1091             lock->l_exp_refs_target != exp) {
1092                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1093                               exp, lock, lock->l_exp_refs_target);
1094         }
1095         if ((lock->l_exp_refs_nr ++) == 0) {
1096                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1097                 lock->l_exp_refs_target = exp;
1098         }
1099         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1100                lock, exp, lock->l_exp_refs_nr);
1101         spin_unlock(&exp->exp_locks_list_guard);
1102 }
1103
1104 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1105 {
1106         spin_lock(&exp->exp_locks_list_guard);
1107         LASSERT(lock->l_exp_refs_nr > 0);
1108         if (lock->l_exp_refs_target != exp) {
1109                 LCONSOLE_WARN("lock %p, "
1110                               "mismatching export pointers: %p, %p\n",
1111                               lock, lock->l_exp_refs_target, exp);
1112         }
1113         if (-- lock->l_exp_refs_nr == 0) {
1114                 list_del_init(&lock->l_exp_refs_link);
1115                 lock->l_exp_refs_target = NULL;
1116         }
1117         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1118                lock, exp, lock->l_exp_refs_nr);
1119         spin_unlock(&exp->exp_locks_list_guard);
1120 }
1121 #endif
1122
1123 /* A connection defines an export context in which preallocation can
1124    be managed. This releases the export pointer reference, and returns
1125    the export handle, so the export refcount is 1 when this function
1126    returns. */
1127 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1128                   struct obd_uuid *cluuid)
1129 {
1130         struct obd_export *export;
1131         LASSERT(conn != NULL);
1132         LASSERT(obd != NULL);
1133         LASSERT(cluuid != NULL);
1134         ENTRY;
1135
1136         export = class_new_export(obd, cluuid);
1137         if (IS_ERR(export))
1138                 RETURN(PTR_ERR(export));
1139
1140         conn->cookie = export->exp_handle.h_cookie;
1141         class_export_put(export);
1142
1143         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1144                cluuid->uuid, conn->cookie);
1145         RETURN(0);
1146 }
1147 EXPORT_SYMBOL(class_connect);
1148
1149 /* if export is involved in recovery then clean up related things */
1150 static void class_export_recovery_cleanup(struct obd_export *exp)
1151 {
1152         struct obd_device *obd = exp->exp_obd;
1153
1154         spin_lock(&obd->obd_recovery_task_lock);
1155         if (obd->obd_recovering) {
1156                 if (exp->exp_in_recovery) {
1157                         spin_lock(&exp->exp_lock);
1158                         exp->exp_in_recovery = 0;
1159                         spin_unlock(&exp->exp_lock);
1160                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1161                         atomic_dec(&obd->obd_connected_clients);
1162                 }
1163
1164                 /* if called during recovery then should update
1165                  * obd_stale_clients counter,
1166                  * lightweight exports are not counted */
1167                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1168                         exp->exp_obd->obd_stale_clients++;
1169         }
1170         spin_unlock(&obd->obd_recovery_task_lock);
1171
1172         spin_lock(&exp->exp_lock);
1173         /** Cleanup req replay fields */
1174         if (exp->exp_req_replay_needed) {
1175                 exp->exp_req_replay_needed = 0;
1176
1177                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1178                 atomic_dec(&obd->obd_req_replay_clients);
1179         }
1180
1181         /** Cleanup lock replay data */
1182         if (exp->exp_lock_replay_needed) {
1183                 exp->exp_lock_replay_needed = 0;
1184
1185                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1186                 atomic_dec(&obd->obd_lock_replay_clients);
1187         }
1188         spin_unlock(&exp->exp_lock);
1189 }
1190
1191 /* This function removes 1-3 references from the export:
1192  * 1 - for export pointer passed
1193  * and if disconnect really need
1194  * 2 - removing from hash
1195  * 3 - in client_unlink_export
1196  * The export pointer passed to this function can destroyed */
1197 int class_disconnect(struct obd_export *export)
1198 {
1199         int already_disconnected;
1200         ENTRY;
1201
1202         if (export == NULL) {
1203                 CWARN("attempting to free NULL export %p\n", export);
1204                 RETURN(-EINVAL);
1205         }
1206
1207         spin_lock(&export->exp_lock);
1208         already_disconnected = export->exp_disconnected;
1209         export->exp_disconnected = 1;
1210         spin_unlock(&export->exp_lock);
1211
1212         /* class_cleanup(), abort_recovery(), and class_fail_export()
1213          * all end up in here, and if any of them race we shouldn't
1214          * call extra class_export_puts(). */
1215         if (already_disconnected) {
1216                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1217                 GOTO(no_disconn, already_disconnected);
1218         }
1219
1220         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1221                export->exp_handle.h_cookie);
1222
1223         if (!hlist_unhashed(&export->exp_nid_hash))
1224                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1225                              &export->exp_connection->c_peer.nid,
1226                              &export->exp_nid_hash);
1227
1228         class_export_recovery_cleanup(export);
1229         class_unlink_export(export);
1230 no_disconn:
1231         class_export_put(export);
1232         RETURN(0);
1233 }
1234 EXPORT_SYMBOL(class_disconnect);
1235
1236 /* Return non-zero for a fully connected export */
1237 int class_connected_export(struct obd_export *exp)
1238 {
1239         int connected = 0;
1240
1241         if (exp) {
1242                 spin_lock(&exp->exp_lock);
1243                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1244                 spin_unlock(&exp->exp_lock);
1245         }
1246         return connected;
1247 }
1248 EXPORT_SYMBOL(class_connected_export);
1249
1250 static void class_disconnect_export_list(struct list_head *list,
1251                                          enum obd_option flags)
1252 {
1253         int rc;
1254         struct obd_export *exp;
1255         ENTRY;
1256
1257         /* It's possible that an export may disconnect itself, but
1258          * nothing else will be added to this list. */
1259         while (!list_empty(list)) {
1260                 exp = list_entry(list->next, struct obd_export,
1261                                  exp_obd_chain);
1262                 /* need for safe call CDEBUG after obd_disconnect */
1263                 class_export_get(exp);
1264
1265                 spin_lock(&exp->exp_lock);
1266                 exp->exp_flags = flags;
1267                 spin_unlock(&exp->exp_lock);
1268
1269                 if (obd_uuid_equals(&exp->exp_client_uuid,
1270                                     &exp->exp_obd->obd_uuid)) {
1271                         CDEBUG(D_HA,
1272                                "exp %p export uuid == obd uuid, don't discon\n",
1273                                exp);
1274                         /* Need to delete this now so we don't end up pointing
1275                          * to work_list later when this export is cleaned up. */
1276                         list_del_init(&exp->exp_obd_chain);
1277                         class_export_put(exp);
1278                         continue;
1279                 }
1280
1281                 class_export_get(exp);
1282                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1283                        "last request at "CFS_TIME_T"\n",
1284                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1285                        exp, exp->exp_last_request_time);
1286                 /* release one export reference anyway */
1287                 rc = obd_disconnect(exp);
1288
1289                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1290                        obd_export_nid2str(exp), exp, rc);
1291                 class_export_put(exp);
1292         }
1293         EXIT;
1294 }
1295
1296 void class_disconnect_exports(struct obd_device *obd)
1297 {
1298         struct list_head work_list;
1299         ENTRY;
1300
1301         /* Move all of the exports from obd_exports to a work list, en masse. */
1302         INIT_LIST_HEAD(&work_list);
1303         spin_lock(&obd->obd_dev_lock);
1304         list_splice_init(&obd->obd_exports, &work_list);
1305         list_splice_init(&obd->obd_delayed_exports, &work_list);
1306         spin_unlock(&obd->obd_dev_lock);
1307
1308         if (!list_empty(&work_list)) {
1309                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1310                        "disconnecting them\n", obd->obd_minor, obd);
1311                 class_disconnect_export_list(&work_list,
1312                                              exp_flags_from_obd(obd));
1313         } else
1314                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1315                        obd->obd_minor, obd);
1316         EXIT;
1317 }
1318 EXPORT_SYMBOL(class_disconnect_exports);
1319
1320 /* Remove exports that have not completed recovery.
1321  */
1322 void class_disconnect_stale_exports(struct obd_device *obd,
1323                                     int (*test_export)(struct obd_export *))
1324 {
1325         struct list_head work_list;
1326         struct obd_export *exp, *n;
1327         int evicted = 0;
1328         ENTRY;
1329
1330         INIT_LIST_HEAD(&work_list);
1331         spin_lock(&obd->obd_dev_lock);
1332         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1333                                  exp_obd_chain) {
1334                 /* don't count self-export as client */
1335                 if (obd_uuid_equals(&exp->exp_client_uuid,
1336                                     &exp->exp_obd->obd_uuid))
1337                         continue;
1338
1339                 /* don't evict clients which have no slot in last_rcvd
1340                  * (e.g. lightweight connection) */
1341                 if (exp->exp_target_data.ted_lr_idx == -1)
1342                         continue;
1343
1344                 spin_lock(&exp->exp_lock);
1345                 if (exp->exp_failed || test_export(exp)) {
1346                         spin_unlock(&exp->exp_lock);
1347                         continue;
1348                 }
1349                 exp->exp_failed = 1;
1350                 spin_unlock(&exp->exp_lock);
1351
1352                 list_move(&exp->exp_obd_chain, &work_list);
1353                 evicted++;
1354                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1355                        obd->obd_name, exp->exp_client_uuid.uuid,
1356                        exp->exp_connection == NULL ? "<unknown>" :
1357                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1358                 print_export_data(exp, "EVICTING", 0);
1359         }
1360         spin_unlock(&obd->obd_dev_lock);
1361
1362         if (evicted)
1363                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1364                               obd->obd_name, evicted);
1365
1366         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1367                                                  OBD_OPT_ABORT_RECOV);
1368         EXIT;
1369 }
1370 EXPORT_SYMBOL(class_disconnect_stale_exports);
1371
1372 void class_fail_export(struct obd_export *exp)
1373 {
1374         int rc, already_failed;
1375
1376         spin_lock(&exp->exp_lock);
1377         already_failed = exp->exp_failed;
1378         exp->exp_failed = 1;
1379         spin_unlock(&exp->exp_lock);
1380
1381         if (already_failed) {
1382                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1383                        exp, exp->exp_client_uuid.uuid);
1384                 return;
1385         }
1386
1387         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1388                exp, exp->exp_client_uuid.uuid);
1389
1390         if (obd_dump_on_timeout)
1391                 libcfs_debug_dumplog();
1392
1393         /* need for safe call CDEBUG after obd_disconnect */
1394         class_export_get(exp);
1395
1396         /* Most callers into obd_disconnect are removing their own reference
1397          * (request, for example) in addition to the one from the hash table.
1398          * We don't have such a reference here, so make one. */
1399         class_export_get(exp);
1400         rc = obd_disconnect(exp);
1401         if (rc)
1402                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1403         else
1404                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1405                        exp, exp->exp_client_uuid.uuid);
1406         class_export_put(exp);
1407 }
1408 EXPORT_SYMBOL(class_fail_export);
1409
1410 char *obd_export_nid2str(struct obd_export *exp)
1411 {
1412         if (exp->exp_connection != NULL)
1413                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1414
1415         return "(no nid)";
1416 }
1417 EXPORT_SYMBOL(obd_export_nid2str);
1418
1419 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1420 {
1421         cfs_hash_t *nid_hash;
1422         struct obd_export *doomed_exp = NULL;
1423         int exports_evicted = 0;
1424
1425         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1426
1427         spin_lock(&obd->obd_dev_lock);
1428         /* umount has run already, so evict thread should leave
1429          * its task to umount thread now */
1430         if (obd->obd_stopping) {
1431                 spin_unlock(&obd->obd_dev_lock);
1432                 return exports_evicted;
1433         }
1434         nid_hash = obd->obd_nid_hash;
1435         cfs_hash_getref(nid_hash);
1436         spin_unlock(&obd->obd_dev_lock);
1437
1438         do {
1439                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1440                 if (doomed_exp == NULL)
1441                         break;
1442
1443                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1444                          "nid %s found, wanted nid %s, requested nid %s\n",
1445                          obd_export_nid2str(doomed_exp),
1446                          libcfs_nid2str(nid_key), nid);
1447                 LASSERTF(doomed_exp != obd->obd_self_export,
1448                          "self-export is hashed by NID?\n");
1449                 exports_evicted++;
1450                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1451                               "request\n", obd->obd_name,
1452                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1453                               obd_export_nid2str(doomed_exp));
1454                 class_fail_export(doomed_exp);
1455                 class_export_put(doomed_exp);
1456         } while (1);
1457
1458         cfs_hash_putref(nid_hash);
1459
1460         if (!exports_evicted)
1461                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1462                        obd->obd_name, nid);
1463         return exports_evicted;
1464 }
1465 EXPORT_SYMBOL(obd_export_evict_by_nid);
1466
1467 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1468 {
1469         cfs_hash_t *uuid_hash;
1470         struct obd_export *doomed_exp = NULL;
1471         struct obd_uuid doomed_uuid;
1472         int exports_evicted = 0;
1473
1474         spin_lock(&obd->obd_dev_lock);
1475         if (obd->obd_stopping) {
1476                 spin_unlock(&obd->obd_dev_lock);
1477                 return exports_evicted;
1478         }
1479         uuid_hash = obd->obd_uuid_hash;
1480         cfs_hash_getref(uuid_hash);
1481         spin_unlock(&obd->obd_dev_lock);
1482
1483         obd_str2uuid(&doomed_uuid, uuid);
1484         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1485                 CERROR("%s: can't evict myself\n", obd->obd_name);
1486                 cfs_hash_putref(uuid_hash);
1487                 return exports_evicted;
1488         }
1489
1490         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1491
1492         if (doomed_exp == NULL) {
1493                 CERROR("%s: can't disconnect %s: no exports found\n",
1494                        obd->obd_name, uuid);
1495         } else {
1496                 CWARN("%s: evicting %s at adminstrative request\n",
1497                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1498                 class_fail_export(doomed_exp);
1499                 class_export_put(doomed_exp);
1500                 exports_evicted++;
1501         }
1502         cfs_hash_putref(uuid_hash);
1503
1504         return exports_evicted;
1505 }
1506
1507 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1508 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1509 #endif
1510
1511 static void print_export_data(struct obd_export *exp, const char *status,
1512                               int locks)
1513 {
1514         struct ptlrpc_reply_state *rs;
1515         struct ptlrpc_reply_state *first_reply = NULL;
1516         int nreplies = 0;
1517
1518         spin_lock(&exp->exp_lock);
1519         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1520                             rs_exp_list) {
1521                 if (nreplies == 0)
1522                         first_reply = rs;
1523                 nreplies++;
1524         }
1525         spin_unlock(&exp->exp_lock);
1526
1527         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1528                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1529                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1530                atomic_read(&exp->exp_rpc_count),
1531                atomic_read(&exp->exp_cb_count),
1532                atomic_read(&exp->exp_locks_count),
1533                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1534                nreplies, first_reply, nreplies > 3 ? "..." : "",
1535                exp->exp_last_committed);
1536 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1537         if (locks && class_export_dump_hook != NULL)
1538                 class_export_dump_hook(exp);
1539 #endif
1540 }
1541
1542 void dump_exports(struct obd_device *obd, int locks)
1543 {
1544         struct obd_export *exp;
1545
1546         spin_lock(&obd->obd_dev_lock);
1547         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1548                 print_export_data(exp, "ACTIVE", locks);
1549         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1550                 print_export_data(exp, "UNLINKED", locks);
1551         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1552                 print_export_data(exp, "DELAYED", locks);
1553         spin_unlock(&obd->obd_dev_lock);
1554         spin_lock(&obd_zombie_impexp_lock);
1555         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1556                 print_export_data(exp, "ZOMBIE", locks);
1557         spin_unlock(&obd_zombie_impexp_lock);
1558 }
1559
1560 void obd_exports_barrier(struct obd_device *obd)
1561 {
1562         int waited = 2;
1563         LASSERT(list_empty(&obd->obd_exports));
1564         spin_lock(&obd->obd_dev_lock);
1565         while (!list_empty(&obd->obd_unlinked_exports)) {
1566                 spin_unlock(&obd->obd_dev_lock);
1567                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1568                                                    cfs_time_seconds(waited));
1569                 if (waited > 5 && IS_PO2(waited)) {
1570                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1571                                       "more than %d seconds. "
1572                                       "The obd refcount = %d. Is it stuck?\n",
1573                                       obd->obd_name, waited,
1574                                       atomic_read(&obd->obd_refcount));
1575                         dump_exports(obd, 1);
1576                 }
1577                 waited *= 2;
1578                 spin_lock(&obd->obd_dev_lock);
1579         }
1580         spin_unlock(&obd->obd_dev_lock);
1581 }
1582 EXPORT_SYMBOL(obd_exports_barrier);
1583
1584 /* Total amount of zombies to be destroyed */
1585 static int zombies_count = 0;
1586
1587 /**
1588  * kill zombie imports and exports
1589  */
1590 void obd_zombie_impexp_cull(void)
1591 {
1592         struct obd_import *import;
1593         struct obd_export *export;
1594         ENTRY;
1595
1596         do {
1597                 spin_lock(&obd_zombie_impexp_lock);
1598
1599                 import = NULL;
1600                 if (!list_empty(&obd_zombie_imports)) {
1601                         import = list_entry(obd_zombie_imports.next,
1602                                             struct obd_import,
1603                                             imp_zombie_chain);
1604                         list_del_init(&import->imp_zombie_chain);
1605                 }
1606
1607                 export = NULL;
1608                 if (!list_empty(&obd_zombie_exports)) {
1609                         export = list_entry(obd_zombie_exports.next,
1610                                             struct obd_export,
1611                                             exp_obd_chain);
1612                         list_del_init(&export->exp_obd_chain);
1613                 }
1614
1615                 spin_unlock(&obd_zombie_impexp_lock);
1616
1617                 if (import != NULL) {
1618                         class_import_destroy(import);
1619                         spin_lock(&obd_zombie_impexp_lock);
1620                         zombies_count--;
1621                         spin_unlock(&obd_zombie_impexp_lock);
1622                 }
1623
1624                 if (export != NULL) {
1625                         class_export_destroy(export);
1626                         spin_lock(&obd_zombie_impexp_lock);
1627                         zombies_count--;
1628                         spin_unlock(&obd_zombie_impexp_lock);
1629                 }
1630
1631                 cond_resched();
1632         } while (import != NULL || export != NULL);
1633         EXIT;
1634 }
1635
1636 static struct completion        obd_zombie_start;
1637 static struct completion        obd_zombie_stop;
1638 static unsigned long            obd_zombie_flags;
1639 static wait_queue_head_t        obd_zombie_waitq;
1640 static pid_t                    obd_zombie_pid;
1641
1642 enum {
1643         OBD_ZOMBIE_STOP         = 0x0001,
1644 };
1645
1646 /**
1647  * check for work for kill zombie import/export thread.
1648  */
1649 static int obd_zombie_impexp_check(void *arg)
1650 {
1651         int rc;
1652
1653         spin_lock(&obd_zombie_impexp_lock);
1654         rc = (zombies_count == 0) &&
1655              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1656         spin_unlock(&obd_zombie_impexp_lock);
1657
1658         RETURN(rc);
1659 }
1660
1661 /**
1662  * Add export to the obd_zombe thread and notify it.
1663  */
1664 static void obd_zombie_export_add(struct obd_export *exp) {
1665         spin_lock(&exp->exp_obd->obd_dev_lock);
1666         LASSERT(!list_empty(&exp->exp_obd_chain));
1667         list_del_init(&exp->exp_obd_chain);
1668         spin_unlock(&exp->exp_obd->obd_dev_lock);
1669         spin_lock(&obd_zombie_impexp_lock);
1670         zombies_count++;
1671         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1672         spin_unlock(&obd_zombie_impexp_lock);
1673
1674         obd_zombie_impexp_notify();
1675 }
1676
1677 /**
1678  * Add import to the obd_zombe thread and notify it.
1679  */
1680 static void obd_zombie_import_add(struct obd_import *imp) {
1681         LASSERT(imp->imp_sec == NULL);
1682         LASSERT(imp->imp_rq_pool == NULL);
1683         spin_lock(&obd_zombie_impexp_lock);
1684         LASSERT(list_empty(&imp->imp_zombie_chain));
1685         zombies_count++;
1686         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1687         spin_unlock(&obd_zombie_impexp_lock);
1688
1689         obd_zombie_impexp_notify();
1690 }
1691
1692 /**
1693  * notify import/export destroy thread about new zombie.
1694  */
1695 static void obd_zombie_impexp_notify(void)
1696 {
1697         /*
1698          * Make sure obd_zomebie_impexp_thread get this notification.
1699          * It is possible this signal only get by obd_zombie_barrier, and
1700          * barrier gulps this notification and sleeps away and hangs ensues
1701          */
1702         wake_up_all(&obd_zombie_waitq);
1703 }
1704
1705 /**
1706  * check whether obd_zombie is idle
1707  */
1708 static int obd_zombie_is_idle(void)
1709 {
1710         int rc;
1711
1712         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1713         spin_lock(&obd_zombie_impexp_lock);
1714         rc = (zombies_count == 0);
1715         spin_unlock(&obd_zombie_impexp_lock);
1716         return rc;
1717 }
1718
1719 /**
1720  * wait when obd_zombie import/export queues become empty
1721  */
1722 void obd_zombie_barrier(void)
1723 {
1724         struct l_wait_info lwi = { 0 };
1725
1726         if (obd_zombie_pid == current_pid())
1727                 /* don't wait for myself */
1728                 return;
1729         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1730 }
1731 EXPORT_SYMBOL(obd_zombie_barrier);
1732
1733
1734 /**
1735  * destroy zombie export/import thread.
1736  */
1737 static int obd_zombie_impexp_thread(void *unused)
1738 {
1739         unshare_fs_struct();
1740         complete(&obd_zombie_start);
1741
1742         obd_zombie_pid = current_pid();
1743
1744         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1745                 struct l_wait_info lwi = { 0 };
1746
1747                 l_wait_event(obd_zombie_waitq,
1748                              !obd_zombie_impexp_check(NULL), &lwi);
1749                 obd_zombie_impexp_cull();
1750
1751                 /*
1752                  * Notify obd_zombie_barrier callers that queues
1753                  * may be empty.
1754                  */
1755                 wake_up(&obd_zombie_waitq);
1756         }
1757
1758         complete(&obd_zombie_stop);
1759
1760         RETURN(0);
1761 }
1762
1763
1764 /**
1765  * start destroy zombie import/export thread
1766  */
1767 int obd_zombie_impexp_init(void)
1768 {
1769         struct task_struct *task;
1770
1771         INIT_LIST_HEAD(&obd_zombie_imports);
1772
1773         INIT_LIST_HEAD(&obd_zombie_exports);
1774         spin_lock_init(&obd_zombie_impexp_lock);
1775         init_completion(&obd_zombie_start);
1776         init_completion(&obd_zombie_stop);
1777         init_waitqueue_head(&obd_zombie_waitq);
1778         obd_zombie_pid = 0;
1779
1780         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1781         if (IS_ERR(task))
1782                 RETURN(PTR_ERR(task));
1783
1784         wait_for_completion(&obd_zombie_start);
1785         RETURN(0);
1786 }
1787 /**
1788  * stop destroy zombie import/export thread
1789  */
1790 void obd_zombie_impexp_stop(void)
1791 {
1792         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1793         obd_zombie_impexp_notify();
1794         wait_for_completion(&obd_zombie_stop);
1795 }
1796
1797 /***** Kernel-userspace comm helpers *******/
1798
1799 /* Get length of entire message, including header */
1800 int kuc_len(int payload_len)
1801 {
1802         return sizeof(struct kuc_hdr) + payload_len;
1803 }
1804 EXPORT_SYMBOL(kuc_len);
1805
1806 /* Get a pointer to kuc header, given a ptr to the payload
1807  * @param p Pointer to payload area
1808  * @returns Pointer to kuc header
1809  */
1810 struct kuc_hdr * kuc_ptr(void *p)
1811 {
1812         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1813         LASSERT(lh->kuc_magic == KUC_MAGIC);
1814         return lh;
1815 }
1816 EXPORT_SYMBOL(kuc_ptr);
1817
1818 /* Test if payload is part of kuc message
1819  * @param p Pointer to payload area
1820  * @returns boolean
1821  */
1822 int kuc_ispayload(void *p)
1823 {
1824         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1825
1826         if (kh->kuc_magic == KUC_MAGIC)
1827                 return 1;
1828         else
1829                 return 0;
1830 }
1831 EXPORT_SYMBOL(kuc_ispayload);
1832
1833 /* Alloc space for a message, and fill in header
1834  * @return Pointer to payload area
1835  */
1836 void *kuc_alloc(int payload_len, int transport, int type)
1837 {
1838         struct kuc_hdr *lh;
1839         int len = kuc_len(payload_len);
1840
1841         OBD_ALLOC(lh, len);
1842         if (lh == NULL)
1843                 return ERR_PTR(-ENOMEM);
1844
1845         lh->kuc_magic = KUC_MAGIC;
1846         lh->kuc_transport = transport;
1847         lh->kuc_msgtype = type;
1848         lh->kuc_msglen = len;
1849
1850         return (void *)(lh + 1);
1851 }
1852 EXPORT_SYMBOL(kuc_alloc);
1853
1854 /* Takes pointer to payload area */
1855 inline void kuc_free(void *p, int payload_len)
1856 {
1857         struct kuc_hdr *lh = kuc_ptr(p);
1858         OBD_FREE(lh, kuc_len(payload_len));
1859 }
1860 EXPORT_SYMBOL(kuc_free);
1861
1862 struct obd_request_slot_waiter {
1863         struct list_head        orsw_entry;
1864         wait_queue_head_t       orsw_waitq;
1865         bool                    orsw_signaled;
1866 };
1867
1868 static bool obd_request_slot_avail(struct client_obd *cli,
1869                                    struct obd_request_slot_waiter *orsw)
1870 {
1871         bool avail;
1872
1873         spin_lock(&cli->cl_loi_list_lock);
1874         avail = !!list_empty(&orsw->orsw_entry);
1875         spin_unlock(&cli->cl_loi_list_lock);
1876
1877         return avail;
1878 };
1879
1880 /*
1881  * For network flow control, the RPC sponsor needs to acquire a credit
1882  * before sending the RPC. The credits count for a connection is defined
1883  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1884  * the subsequent RPC sponsors need to wait until others released their
1885  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1886  */
1887 int obd_get_request_slot(struct client_obd *cli)
1888 {
1889         struct obd_request_slot_waiter   orsw;
1890         struct l_wait_info               lwi;
1891         int                              rc;
1892
1893         spin_lock(&cli->cl_loi_list_lock);
1894         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1895                 cli->cl_r_in_flight++;
1896                 spin_unlock(&cli->cl_loi_list_lock);
1897                 return 0;
1898         }
1899
1900         init_waitqueue_head(&orsw.orsw_waitq);
1901         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1902         orsw.orsw_signaled = false;
1903         spin_unlock(&cli->cl_loi_list_lock);
1904
1905         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1906         rc = l_wait_event(orsw.orsw_waitq,
1907                           obd_request_slot_avail(cli, &orsw) ||
1908                           orsw.orsw_signaled,
1909                           &lwi);
1910
1911         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1912          * freed but other (such as obd_put_request_slot) is using it. */
1913         spin_lock(&cli->cl_loi_list_lock);
1914         if (rc != 0) {
1915                 if (!orsw.orsw_signaled) {
1916                         if (list_empty(&orsw.orsw_entry))
1917                                 cli->cl_r_in_flight--;
1918                         else
1919                                 list_del(&orsw.orsw_entry);
1920                 }
1921         }
1922
1923         if (orsw.orsw_signaled) {
1924                 LASSERT(list_empty(&orsw.orsw_entry));
1925
1926                 rc = -EINTR;
1927         }
1928         spin_unlock(&cli->cl_loi_list_lock);
1929
1930         return rc;
1931 }
1932 EXPORT_SYMBOL(obd_get_request_slot);
1933
1934 void obd_put_request_slot(struct client_obd *cli)
1935 {
1936         struct obd_request_slot_waiter *orsw;
1937
1938         spin_lock(&cli->cl_loi_list_lock);
1939         cli->cl_r_in_flight--;
1940
1941         /* If there is free slot, wakeup the first waiter. */
1942         if (!list_empty(&cli->cl_loi_read_list) &&
1943             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1944                 orsw = list_entry(cli->cl_loi_read_list.next,
1945                                   struct obd_request_slot_waiter, orsw_entry);
1946                 list_del_init(&orsw->orsw_entry);
1947                 cli->cl_r_in_flight++;
1948                 wake_up(&orsw->orsw_waitq);
1949         }
1950         spin_unlock(&cli->cl_loi_list_lock);
1951 }
1952 EXPORT_SYMBOL(obd_put_request_slot);
1953
1954 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1955 {
1956         return cli->cl_max_rpcs_in_flight;
1957 }
1958 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1959
1960 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1961 {
1962         struct obd_request_slot_waiter *orsw;
1963         __u32                           old;
1964         int                             diff;
1965         int                             i;
1966
1967         if (max > OBD_MAX_RIF_MAX || max < 1)
1968                 return -ERANGE;
1969
1970         spin_lock(&cli->cl_loi_list_lock);
1971         old = cli->cl_max_rpcs_in_flight;
1972         cli->cl_max_rpcs_in_flight = max;
1973         diff = max - old;
1974
1975         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1976         for (i = 0; i < diff; i++) {
1977                 if (list_empty(&cli->cl_loi_read_list))
1978                         break;
1979
1980                 orsw = list_entry(cli->cl_loi_read_list.next,
1981                                   struct obd_request_slot_waiter, orsw_entry);
1982                 list_del_init(&orsw->orsw_entry);
1983                 cli->cl_r_in_flight++;
1984                 wake_up(&orsw->orsw_waitq);
1985         }
1986         spin_unlock(&cli->cl_loi_list_lock);
1987
1988         return 0;
1989 }
1990 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);