Whamcloud - gitweb
LU-6245 libcfs: remove prim wrappers for libcfs
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
48
49 spinlock_t obd_types_lock;
50
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
55
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t  obd_zombie_impexp_lock;
59
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114 EXPORT_SYMBOL(class_search_type);
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123
124                 if (strcmp(modname, "obdfilter") == 0)
125                         modname = "ofd";
126
127                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128                         modname = LUSTRE_OSP_NAME;
129
130                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131                         modname = LUSTRE_MDT_NAME;
132
133                 if (!request_module("%s", modname)) {
134                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135                         type = class_search_type(name);
136                 } else {
137                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138                                            modname);
139                 }
140         }
141 #endif
142         if (type) {
143                 spin_lock(&type->obd_type_lock);
144                 type->typ_refcnt++;
145                 try_module_get(type->typ_dt_ops->o_owner);
146                 spin_unlock(&type->obd_type_lock);
147         }
148         return type;
149 }
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         spin_lock(&type->obd_type_lock);
155         type->typ_refcnt--;
156         module_put(type->typ_dt_ops->o_owner);
157         spin_unlock(&type->obd_type_lock);
158 }
159
160 #define CLASS_MAX_NAME 1024
161
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163                         bool enable_proc, struct lprocfs_vars *vars,
164                         const char *name, struct lu_device_type *ldt)
165 {
166         struct obd_type *type;
167         int rc = 0;
168         ENTRY;
169
170         /* sanity check */
171         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172
173         if (class_search_type(name)) {
174                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175                 RETURN(-EEXIST);
176         }
177
178         rc = -ENOMEM;
179         OBD_ALLOC(type, sizeof(*type));
180         if (type == NULL)
181                 RETURN(rc);
182
183         OBD_ALLOC_PTR(type->typ_dt_ops);
184         OBD_ALLOC_PTR(type->typ_md_ops);
185         OBD_ALLOC(type->typ_name, strlen(name) + 1);
186
187         if (type->typ_dt_ops == NULL ||
188             type->typ_md_ops == NULL ||
189             type->typ_name == NULL)
190                 GOTO (failed, rc);
191
192         *(type->typ_dt_ops) = *dt_ops;
193         /* md_ops is optional */
194         if (md_ops)
195                 *(type->typ_md_ops) = *md_ops;
196         strcpy(type->typ_name, name);
197         spin_lock_init(&type->obd_type_lock);
198
199 #ifdef CONFIG_PROC_FS
200         if (enable_proc) {
201                 type->typ_procroot = lprocfs_register(type->typ_name,
202                                                       proc_lustre_root,
203                                                       vars, type);
204                 if (IS_ERR(type->typ_procroot)) {
205                         rc = PTR_ERR(type->typ_procroot);
206                         type->typ_procroot = NULL;
207                         GOTO(failed, rc);
208                 }
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224 failed:
225         if (type->typ_name != NULL) {
226 #ifdef CONFIG_PROC_FS
227                 if (type->typ_procroot != NULL)
228                         remove_proc_subtree(type->typ_name, proc_lustre_root);
229 #endif
230                 OBD_FREE(type->typ_name, strlen(name) + 1);
231         }
232         if (type->typ_md_ops != NULL)
233                 OBD_FREE_PTR(type->typ_md_ops);
234         if (type->typ_dt_ops != NULL)
235                 OBD_FREE_PTR(type->typ_dt_ops);
236         OBD_FREE(type, sizeof(*type));
237         RETURN(rc);
238 }
239 EXPORT_SYMBOL(class_register_type);
240
241 int class_unregister_type(const char *name)
242 {
243         struct obd_type *type = class_search_type(name);
244         ENTRY;
245
246         if (!type) {
247                 CERROR("unknown obd type\n");
248                 RETURN(-EINVAL);
249         }
250
251         if (type->typ_refcnt) {
252                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
253                 /* This is a bad situation, let's make the best of it */
254                 /* Remove ops, but leave the name for debugging */
255                 OBD_FREE_PTR(type->typ_dt_ops);
256                 OBD_FREE_PTR(type->typ_md_ops);
257                 RETURN(-EBUSY);
258         }
259
260         /* we do not use type->typ_procroot as for compatibility purposes
261          * other modules can share names (i.e. lod can use lov entry). so
262          * we can't reference pointer as it can get invalided when another
263          * module removes the entry */
264 #ifdef CONFIG_PROC_FS
265         if (type->typ_procroot != NULL)
266                 remove_proc_subtree(type->typ_name, proc_lustre_root);
267         if (type->typ_procsym != NULL)
268                 lprocfs_remove(&type->typ_procsym);
269 #endif
270         if (type->typ_lu)
271                 lu_device_type_fini(type->typ_lu);
272
273         spin_lock(&obd_types_lock);
274         list_del(&type->typ_chain);
275         spin_unlock(&obd_types_lock);
276         OBD_FREE(type->typ_name, strlen(name) + 1);
277         if (type->typ_dt_ops != NULL)
278                 OBD_FREE_PTR(type->typ_dt_ops);
279         if (type->typ_md_ops != NULL)
280                 OBD_FREE_PTR(type->typ_md_ops);
281         OBD_FREE(type, sizeof(*type));
282         RETURN(0);
283 } /* class_unregister_type */
284 EXPORT_SYMBOL(class_unregister_type);
285
286 /**
287  * Create a new obd device.
288  *
289  * Find an empty slot in ::obd_devs[], create a new obd device in it.
290  *
291  * \param[in] type_name obd device type string.
292  * \param[in] name      obd device name.
293  *
294  * \retval NULL if create fails, otherwise return the obd device
295  *         pointer created.
296  */
297 struct obd_device *class_newdev(const char *type_name, const char *name)
298 {
299         struct obd_device *result = NULL;
300         struct obd_device *newdev;
301         struct obd_type *type = NULL;
302         int i;
303         int new_obd_minor = 0;
304         ENTRY;
305
306         if (strlen(name) >= MAX_OBD_NAME) {
307                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
308                 RETURN(ERR_PTR(-EINVAL));
309         }
310
311         type = class_get_type(type_name);
312         if (type == NULL){
313                 CERROR("OBD: unknown type: %s\n", type_name);
314                 RETURN(ERR_PTR(-ENODEV));
315         }
316
317         newdev = obd_device_alloc();
318         if (newdev == NULL)
319                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
320
321         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
322
323         write_lock(&obd_dev_lock);
324         for (i = 0; i < class_devno_max(); i++) {
325                 struct obd_device *obd = class_num2obd(i);
326
327                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
328                         CERROR("Device %s already exists at %d, won't add\n",
329                                name, i);
330                         if (result) {
331                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
332                                          "%p obd_magic %08x != %08x\n", result,
333                                          result->obd_magic, OBD_DEVICE_MAGIC);
334                                 LASSERTF(result->obd_minor == new_obd_minor,
335                                          "%p obd_minor %d != %d\n", result,
336                                          result->obd_minor, new_obd_minor);
337
338                                 obd_devs[result->obd_minor] = NULL;
339                                 result->obd_name[0]='\0';
340                          }
341                         result = ERR_PTR(-EEXIST);
342                         break;
343                 }
344                 if (!result && !obd) {
345                         result = newdev;
346                         result->obd_minor = i;
347                         new_obd_minor = i;
348                         result->obd_type = type;
349                         strncpy(result->obd_name, name,
350                                 sizeof(result->obd_name) - 1);
351                         obd_devs[i] = result;
352                 }
353         }
354         write_unlock(&obd_dev_lock);
355
356         if (result == NULL && i >= class_devno_max()) {
357                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
358                        class_devno_max());
359                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
360         }
361
362         if (IS_ERR(result))
363                 GOTO(out, result);
364
365         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
366                result->obd_name, result);
367
368         RETURN(result);
369 out:
370         obd_device_free(newdev);
371 out_type:
372         class_put_type(type);
373         return result;
374 }
375
376 void class_release_dev(struct obd_device *obd)
377 {
378         struct obd_type *obd_type = obd->obd_type;
379
380         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
381                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
382         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
383                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
384         LASSERT(obd_type != NULL);
385
386         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
387                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
388
389         write_lock(&obd_dev_lock);
390         obd_devs[obd->obd_minor] = NULL;
391         write_unlock(&obd_dev_lock);
392         obd_device_free(obd);
393
394         class_put_type(obd_type);
395 }
396
397 int class_name2dev(const char *name)
398 {
399         int i;
400
401         if (!name)
402                 return -1;
403
404         read_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407
408                 if (obd && strcmp(name, obd->obd_name) == 0) {
409                         /* Make sure we finished attaching before we give
410                            out any references */
411                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412                         if (obd->obd_attached) {
413                                 read_unlock(&obd_dev_lock);
414                                 return i;
415                         }
416                         break;
417                 }
418         }
419         read_unlock(&obd_dev_lock);
420
421         return -1;
422 }
423
424 struct obd_device *class_name2obd(const char *name)
425 {
426         int dev = class_name2dev(name);
427
428         if (dev < 0 || dev > class_devno_max())
429                 return NULL;
430         return class_num2obd(dev);
431 }
432 EXPORT_SYMBOL(class_name2obd);
433
434 int class_uuid2dev(struct obd_uuid *uuid)
435 {
436         int i;
437
438         read_lock(&obd_dev_lock);
439         for (i = 0; i < class_devno_max(); i++) {
440                 struct obd_device *obd = class_num2obd(i);
441
442                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444                         read_unlock(&obd_dev_lock);
445                         return i;
446                 }
447         }
448         read_unlock(&obd_dev_lock);
449
450         return -1;
451 }
452
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
454 {
455         int dev = class_uuid2dev(uuid);
456         if (dev < 0)
457                 return NULL;
458         return class_num2obd(dev);
459 }
460 EXPORT_SYMBOL(class_uuid2obd);
461
462 /**
463  * Get obd device from ::obd_devs[]
464  *
465  * \param num [in] array index
466  *
467  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468  *         otherwise return the obd device there.
469  */
470 struct obd_device *class_num2obd(int num)
471 {
472         struct obd_device *obd = NULL;
473
474         if (num < class_devno_max()) {
475                 obd = obd_devs[num];
476                 if (obd == NULL)
477                         return NULL;
478
479                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480                          "%p obd_magic %08x != %08x\n",
481                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482                 LASSERTF(obd->obd_minor == num,
483                          "%p obd_minor %0d != %0d\n",
484                          obd, obd->obd_minor, num);
485         }
486
487         return obd;
488 }
489
490 /**
491  * Get obd devices count. Device in any
492  *    state are counted
493  * \retval obd device count
494  */
495 int get_devices_count(void)
496 {
497         int index, max_index = class_devno_max(), dev_count = 0;
498
499         read_lock(&obd_dev_lock);
500         for (index = 0; index <= max_index; index++) {
501                 struct obd_device *obd = class_num2obd(index);
502                 if (obd != NULL)
503                         dev_count++;
504         }
505         read_unlock(&obd_dev_lock);
506
507         return dev_count;
508 }
509 EXPORT_SYMBOL(get_devices_count);
510
511 void class_obd_list(void)
512 {
513         char *status;
514         int i;
515
516         read_lock(&obd_dev_lock);
517         for (i = 0; i < class_devno_max(); i++) {
518                 struct obd_device *obd = class_num2obd(i);
519
520                 if (obd == NULL)
521                         continue;
522                 if (obd->obd_stopping)
523                         status = "ST";
524                 else if (obd->obd_set_up)
525                         status = "UP";
526                 else if (obd->obd_attached)
527                         status = "AT";
528                 else
529                         status = "--";
530                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
531                          i, status, obd->obd_type->typ_name,
532                          obd->obd_name, obd->obd_uuid.uuid,
533                          atomic_read(&obd->obd_refcount));
534         }
535         read_unlock(&obd_dev_lock);
536         return;
537 }
538
539 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
540    specified, then only the client with that uuid is returned,
541    otherwise any client connected to the tgt is returned. */
542 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
543                                           const char * typ_name,
544                                           struct obd_uuid *grp_uuid)
545 {
546         int i;
547
548         read_lock(&obd_dev_lock);
549         for (i = 0; i < class_devno_max(); i++) {
550                 struct obd_device *obd = class_num2obd(i);
551
552                 if (obd == NULL)
553                         continue;
554                 if ((strncmp(obd->obd_type->typ_name, typ_name,
555                              strlen(typ_name)) == 0)) {
556                         if (obd_uuid_equals(tgt_uuid,
557                                             &obd->u.cli.cl_target_uuid) &&
558                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
559                                                          &obd->obd_uuid) : 1)) {
560                                 read_unlock(&obd_dev_lock);
561                                 return obd;
562                         }
563                 }
564         }
565         read_unlock(&obd_dev_lock);
566
567         return NULL;
568 }
569 EXPORT_SYMBOL(class_find_client_obd);
570
571 /* Iterate the obd_device list looking devices have grp_uuid. Start
572    searching at *next, and if a device is found, the next index to look
573    at is saved in *next. If next is NULL, then the first matching device
574    will always be returned. */
575 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
576 {
577         int i;
578
579         if (next == NULL)
580                 i = 0;
581         else if (*next >= 0 && *next < class_devno_max())
582                 i = *next;
583         else
584                 return NULL;
585
586         read_lock(&obd_dev_lock);
587         for (; i < class_devno_max(); i++) {
588                 struct obd_device *obd = class_num2obd(i);
589
590                 if (obd == NULL)
591                         continue;
592                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
593                         if (next != NULL)
594                                 *next = i+1;
595                         read_unlock(&obd_dev_lock);
596                         return obd;
597                 }
598         }
599         read_unlock(&obd_dev_lock);
600
601         return NULL;
602 }
603 EXPORT_SYMBOL(class_devices_in_group);
604
605 /**
606  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
607  * adjust sptlrpc settings accordingly.
608  */
609 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
610 {
611         struct obd_device  *obd;
612         const char         *type;
613         int                 i, rc = 0, rc2;
614
615         LASSERT(namelen > 0);
616
617         read_lock(&obd_dev_lock);
618         for (i = 0; i < class_devno_max(); i++) {
619                 obd = class_num2obd(i);
620
621                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
622                         continue;
623
624                 /* only notify mdc, osc, mdt, ost */
625                 type = obd->obd_type->typ_name;
626                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
627                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
628                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
629                     strcmp(type, LUSTRE_OST_NAME) != 0)
630                         continue;
631
632                 if (strncmp(obd->obd_name, fsname, namelen))
633                         continue;
634
635                 class_incref(obd, __FUNCTION__, obd);
636                 read_unlock(&obd_dev_lock);
637                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
638                                          sizeof(KEY_SPTLRPC_CONF),
639                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
640                 rc = rc ? rc : rc2;
641                 class_decref(obd, __FUNCTION__, obd);
642                 read_lock(&obd_dev_lock);
643         }
644         read_unlock(&obd_dev_lock);
645         return rc;
646 }
647 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
648
649 void obd_cleanup_caches(void)
650 {
651         ENTRY;
652         if (obd_device_cachep) {
653                 kmem_cache_destroy(obd_device_cachep);
654                 obd_device_cachep = NULL;
655         }
656         if (obdo_cachep) {
657                 kmem_cache_destroy(obdo_cachep);
658                 obdo_cachep = NULL;
659         }
660         if (import_cachep) {
661                 kmem_cache_destroy(import_cachep);
662                 import_cachep = NULL;
663         }
664         if (capa_cachep) {
665                 kmem_cache_destroy(capa_cachep);
666                 capa_cachep = NULL;
667         }
668         EXIT;
669 }
670
671 int obd_init_caches(void)
672 {
673         int rc;
674         ENTRY;
675
676         LASSERT(obd_device_cachep == NULL);
677         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
678                                               sizeof(struct obd_device),
679                                               0, 0, NULL);
680         if (!obd_device_cachep)
681                 GOTO(out, rc = -ENOMEM);
682
683         LASSERT(obdo_cachep == NULL);
684         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
685                                         0, 0, NULL);
686         if (!obdo_cachep)
687                 GOTO(out, rc = -ENOMEM);
688
689         LASSERT(import_cachep == NULL);
690         import_cachep = kmem_cache_create("ll_import_cache",
691                                           sizeof(struct obd_import),
692                                           0, 0, NULL);
693         if (!import_cachep)
694                 GOTO(out, rc = -ENOMEM);
695
696         LASSERT(capa_cachep == NULL);
697         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
698                                         0, 0, NULL);
699         if (!capa_cachep)
700                 GOTO(out, rc = -ENOMEM);
701
702         RETURN(0);
703 out:
704         obd_cleanup_caches();
705         RETURN(rc);
706 }
707
708 /* map connection to client */
709 struct obd_export *class_conn2export(struct lustre_handle *conn)
710 {
711         struct obd_export *export;
712         ENTRY;
713
714         if (!conn) {
715                 CDEBUG(D_CACHE, "looking for null handle\n");
716                 RETURN(NULL);
717         }
718
719         if (conn->cookie == -1) {  /* this means assign a new connection */
720                 CDEBUG(D_CACHE, "want a new connection\n");
721                 RETURN(NULL);
722         }
723
724         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
725         export = class_handle2object(conn->cookie, NULL);
726         RETURN(export);
727 }
728 EXPORT_SYMBOL(class_conn2export);
729
730 struct obd_device *class_exp2obd(struct obd_export *exp)
731 {
732         if (exp)
733                 return exp->exp_obd;
734         return NULL;
735 }
736 EXPORT_SYMBOL(class_exp2obd);
737
738 struct obd_device *class_conn2obd(struct lustre_handle *conn)
739 {
740         struct obd_export *export;
741         export = class_conn2export(conn);
742         if (export) {
743                 struct obd_device *obd = export->exp_obd;
744                 class_export_put(export);
745                 return obd;
746         }
747         return NULL;
748 }
749
750 struct obd_import *class_exp2cliimp(struct obd_export *exp)
751 {
752         struct obd_device *obd = exp->exp_obd;
753         if (obd == NULL)
754                 return NULL;
755         return obd->u.cli.cl_import;
756 }
757 EXPORT_SYMBOL(class_exp2cliimp);
758
759 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
760 {
761         struct obd_device *obd = class_conn2obd(conn);
762         if (obd == NULL)
763                 return NULL;
764         return obd->u.cli.cl_import;
765 }
766
767 /* Export management functions */
768 static void class_export_destroy(struct obd_export *exp)
769 {
770         struct obd_device *obd = exp->exp_obd;
771         ENTRY;
772
773         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774         LASSERT(obd != NULL);
775
776         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777                exp->exp_client_uuid.uuid, obd->obd_name);
778
779         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
780         if (exp->exp_connection)
781                 ptlrpc_put_connection_superhack(exp->exp_connection);
782
783         LASSERT(list_empty(&exp->exp_outstanding_replies));
784         LASSERT(list_empty(&exp->exp_uncommitted_replies));
785         LASSERT(list_empty(&exp->exp_req_replay_queue));
786         LASSERT(list_empty(&exp->exp_hp_rpcs));
787         obd_destroy_export(exp);
788         class_decref(obd, "export", exp);
789
790         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
791         EXIT;
792 }
793
794 static void export_handle_addref(void *export)
795 {
796         class_export_get(export);
797 }
798
799 static struct portals_handle_ops export_handle_ops = {
800         .hop_addref = export_handle_addref,
801         .hop_free   = NULL,
802 };
803
804 struct obd_export *class_export_get(struct obd_export *exp)
805 {
806         atomic_inc(&exp->exp_refcount);
807         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
808                atomic_read(&exp->exp_refcount));
809         return exp;
810 }
811 EXPORT_SYMBOL(class_export_get);
812
813 void class_export_put(struct obd_export *exp)
814 {
815         LASSERT(exp != NULL);
816         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
817         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
818                atomic_read(&exp->exp_refcount) - 1);
819
820         if (atomic_dec_and_test(&exp->exp_refcount)) {
821                 LASSERT(!list_empty(&exp->exp_obd_chain));
822                 CDEBUG(D_IOCTL, "final put %p/%s\n",
823                        exp, exp->exp_client_uuid.uuid);
824
825                 /* release nid stat refererence */
826                 lprocfs_exp_cleanup(exp);
827
828                 obd_zombie_export_add(exp);
829         }
830 }
831 EXPORT_SYMBOL(class_export_put);
832
833 /* Creates a new export, adds it to the hash table, and returns a
834  * pointer to it. The refcount is 2: one for the hash reference, and
835  * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837                                     struct obd_uuid *cluuid)
838 {
839         struct obd_export *export;
840         cfs_hash_t *hash = NULL;
841         int rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(export);
845         if (!export)
846                 return ERR_PTR(-ENOMEM);
847
848         export->exp_conn_cnt = 0;
849         export->exp_lock_hash = NULL;
850         export->exp_flock_hash = NULL;
851         atomic_set(&export->exp_refcount, 2);
852         atomic_set(&export->exp_rpc_count, 0);
853         atomic_set(&export->exp_cb_count, 0);
854         atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856         INIT_LIST_HEAD(&export->exp_locks_list);
857         spin_lock_init(&export->exp_locks_list_guard);
858 #endif
859         atomic_set(&export->exp_replay_count, 0);
860         export->exp_obd = obd;
861         INIT_LIST_HEAD(&export->exp_outstanding_replies);
862         spin_lock_init(&export->exp_uncommitted_replies_lock);
863         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864         INIT_LIST_HEAD(&export->exp_req_replay_queue);
865         INIT_LIST_HEAD(&export->exp_handle.h_link);
866         INIT_LIST_HEAD(&export->exp_hp_rpcs);
867         INIT_LIST_HEAD(&export->exp_reg_rpcs);
868         class_handle_hash(&export->exp_handle, &export_handle_ops);
869         export->exp_last_request_time = cfs_time_current_sec();
870         spin_lock_init(&export->exp_lock);
871         spin_lock_init(&export->exp_rpc_lock);
872         INIT_HLIST_NODE(&export->exp_uuid_hash);
873         INIT_HLIST_NODE(&export->exp_nid_hash);
874         spin_lock_init(&export->exp_bl_list_lock);
875         INIT_LIST_HEAD(&export->exp_bl_list);
876
877         export->exp_sp_peer = LUSTRE_SP_ANY;
878         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
879         export->exp_client_uuid = *cluuid;
880         obd_init_export(export);
881
882         spin_lock(&obd->obd_dev_lock);
883         /* shouldn't happen, but might race */
884         if (obd->obd_stopping)
885                 GOTO(exit_unlock, rc = -ENODEV);
886
887         hash = cfs_hash_getref(obd->obd_uuid_hash);
888         if (hash == NULL)
889                 GOTO(exit_unlock, rc = -ENODEV);
890         spin_unlock(&obd->obd_dev_lock);
891
892         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
893                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
894                 if (rc != 0) {
895                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
896                                       obd->obd_name, cluuid->uuid, rc);
897                         GOTO(exit_err, rc = -EALREADY);
898                 }
899         }
900
901         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
902         spin_lock(&obd->obd_dev_lock);
903         if (obd->obd_stopping) {
904                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
905                 GOTO(exit_unlock, rc = -ENODEV);
906         }
907
908         class_incref(obd, "export", export);
909         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
910         list_add_tail(&export->exp_obd_chain_timed,
911                       &export->exp_obd->obd_exports_timed);
912         export->exp_obd->obd_num_exports++;
913         spin_unlock(&obd->obd_dev_lock);
914         cfs_hash_putref(hash);
915         RETURN(export);
916
917 exit_unlock:
918         spin_unlock(&obd->obd_dev_lock);
919 exit_err:
920         if (hash)
921                 cfs_hash_putref(hash);
922         class_handle_unhash(&export->exp_handle);
923         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
924         obd_destroy_export(export);
925         OBD_FREE_PTR(export);
926         return ERR_PTR(rc);
927 }
928 EXPORT_SYMBOL(class_new_export);
929
930 void class_unlink_export(struct obd_export *exp)
931 {
932         class_handle_unhash(&exp->exp_handle);
933
934         spin_lock(&exp->exp_obd->obd_dev_lock);
935         /* delete an uuid-export hashitem from hashtables */
936         if (!hlist_unhashed(&exp->exp_uuid_hash))
937                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
938                              &exp->exp_client_uuid,
939                              &exp->exp_uuid_hash);
940
941         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
942         list_del_init(&exp->exp_obd_chain_timed);
943         exp->exp_obd->obd_num_exports--;
944         spin_unlock(&exp->exp_obd->obd_dev_lock);
945         class_export_put(exp);
946 }
947
948 /* Import management functions */
949 static void class_import_destroy(struct obd_import *imp)
950 {
951         ENTRY;
952
953         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
954                 imp->imp_obd->obd_name);
955
956         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
957
958         ptlrpc_put_connection_superhack(imp->imp_connection);
959
960         while (!list_empty(&imp->imp_conn_list)) {
961                 struct obd_import_conn *imp_conn;
962
963                 imp_conn = list_entry(imp->imp_conn_list.next,
964                                       struct obd_import_conn, oic_item);
965                 list_del_init(&imp_conn->oic_item);
966                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
967                 OBD_FREE(imp_conn, sizeof(*imp_conn));
968         }
969
970         LASSERT(imp->imp_sec == NULL);
971         class_decref(imp->imp_obd, "import", imp);
972         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
973         EXIT;
974 }
975
976 static void import_handle_addref(void *import)
977 {
978         class_import_get(import);
979 }
980
981 static struct portals_handle_ops import_handle_ops = {
982         .hop_addref = import_handle_addref,
983         .hop_free   = NULL,
984 };
985
986 struct obd_import *class_import_get(struct obd_import *import)
987 {
988         atomic_inc(&import->imp_refcount);
989         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
990                atomic_read(&import->imp_refcount),
991                import->imp_obd->obd_name);
992         return import;
993 }
994 EXPORT_SYMBOL(class_import_get);
995
996 void class_import_put(struct obd_import *imp)
997 {
998         ENTRY;
999
1000         LASSERT(list_empty(&imp->imp_zombie_chain));
1001         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1002
1003         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1004                atomic_read(&imp->imp_refcount) - 1,
1005                imp->imp_obd->obd_name);
1006
1007         if (atomic_dec_and_test(&imp->imp_refcount)) {
1008                 CDEBUG(D_INFO, "final put import %p\n", imp);
1009                 obd_zombie_import_add(imp);
1010         }
1011
1012         /* catch possible import put race */
1013         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1014         EXIT;
1015 }
1016 EXPORT_SYMBOL(class_import_put);
1017
1018 static void init_imp_at(struct imp_at *at) {
1019         int i;
1020         at_init(&at->iat_net_latency, 0, 0);
1021         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1022                 /* max service estimates are tracked on the server side, so
1023                    don't use the AT history here, just use the last reported
1024                    val. (But keep hist for proc histogram, worst_ever) */
1025                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1026                         AT_FLG_NOHIST);
1027         }
1028 }
1029
1030 struct obd_import *class_new_import(struct obd_device *obd)
1031 {
1032         struct obd_import *imp;
1033
1034         OBD_ALLOC(imp, sizeof(*imp));
1035         if (imp == NULL)
1036                 return NULL;
1037
1038         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1039         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1040         INIT_LIST_HEAD(&imp->imp_replay_list);
1041         INIT_LIST_HEAD(&imp->imp_sending_list);
1042         INIT_LIST_HEAD(&imp->imp_delayed_list);
1043         INIT_LIST_HEAD(&imp->imp_committed_list);
1044         imp->imp_replay_cursor = &imp->imp_committed_list;
1045         spin_lock_init(&imp->imp_lock);
1046         imp->imp_last_success_conn = 0;
1047         imp->imp_state = LUSTRE_IMP_NEW;
1048         imp->imp_obd = class_incref(obd, "import", imp);
1049         mutex_init(&imp->imp_sec_mutex);
1050         init_waitqueue_head(&imp->imp_recovery_waitq);
1051
1052         atomic_set(&imp->imp_refcount, 2);
1053         atomic_set(&imp->imp_unregistering, 0);
1054         atomic_set(&imp->imp_inflight, 0);
1055         atomic_set(&imp->imp_replay_inflight, 0);
1056         atomic_set(&imp->imp_inval_count, 0);
1057         INIT_LIST_HEAD(&imp->imp_conn_list);
1058         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1059         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1060         init_imp_at(&imp->imp_at);
1061
1062         /* the default magic is V2, will be used in connect RPC, and
1063          * then adjusted according to the flags in request/reply. */
1064         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1065
1066         return imp;
1067 }
1068 EXPORT_SYMBOL(class_new_import);
1069
1070 void class_destroy_import(struct obd_import *import)
1071 {
1072         LASSERT(import != NULL);
1073         LASSERT(import != LP_POISON);
1074
1075         class_handle_unhash(&import->imp_handle);
1076
1077         spin_lock(&import->imp_lock);
1078         import->imp_generation++;
1079         spin_unlock(&import->imp_lock);
1080         class_import_put(import);
1081 }
1082 EXPORT_SYMBOL(class_destroy_import);
1083
1084 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1085
1086 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1087 {
1088         spin_lock(&exp->exp_locks_list_guard);
1089
1090         LASSERT(lock->l_exp_refs_nr >= 0);
1091
1092         if (lock->l_exp_refs_target != NULL &&
1093             lock->l_exp_refs_target != exp) {
1094                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1095                               exp, lock, lock->l_exp_refs_target);
1096         }
1097         if ((lock->l_exp_refs_nr ++) == 0) {
1098                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1099                 lock->l_exp_refs_target = exp;
1100         }
1101         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1102                lock, exp, lock->l_exp_refs_nr);
1103         spin_unlock(&exp->exp_locks_list_guard);
1104 }
1105
1106 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1107 {
1108         spin_lock(&exp->exp_locks_list_guard);
1109         LASSERT(lock->l_exp_refs_nr > 0);
1110         if (lock->l_exp_refs_target != exp) {
1111                 LCONSOLE_WARN("lock %p, "
1112                               "mismatching export pointers: %p, %p\n",
1113                               lock, lock->l_exp_refs_target, exp);
1114         }
1115         if (-- lock->l_exp_refs_nr == 0) {
1116                 list_del_init(&lock->l_exp_refs_link);
1117                 lock->l_exp_refs_target = NULL;
1118         }
1119         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1120                lock, exp, lock->l_exp_refs_nr);
1121         spin_unlock(&exp->exp_locks_list_guard);
1122 }
1123 #endif
1124
1125 /* A connection defines an export context in which preallocation can
1126    be managed. This releases the export pointer reference, and returns
1127    the export handle, so the export refcount is 1 when this function
1128    returns. */
1129 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1130                   struct obd_uuid *cluuid)
1131 {
1132         struct obd_export *export;
1133         LASSERT(conn != NULL);
1134         LASSERT(obd != NULL);
1135         LASSERT(cluuid != NULL);
1136         ENTRY;
1137
1138         export = class_new_export(obd, cluuid);
1139         if (IS_ERR(export))
1140                 RETURN(PTR_ERR(export));
1141
1142         conn->cookie = export->exp_handle.h_cookie;
1143         class_export_put(export);
1144
1145         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1146                cluuid->uuid, conn->cookie);
1147         RETURN(0);
1148 }
1149 EXPORT_SYMBOL(class_connect);
1150
1151 /* if export is involved in recovery then clean up related things */
1152 static void class_export_recovery_cleanup(struct obd_export *exp)
1153 {
1154         struct obd_device *obd = exp->exp_obd;
1155
1156         spin_lock(&obd->obd_recovery_task_lock);
1157         if (obd->obd_recovering) {
1158                 if (exp->exp_in_recovery) {
1159                         spin_lock(&exp->exp_lock);
1160                         exp->exp_in_recovery = 0;
1161                         spin_unlock(&exp->exp_lock);
1162                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1163                         atomic_dec(&obd->obd_connected_clients);
1164                 }
1165
1166                 /* if called during recovery then should update
1167                  * obd_stale_clients counter,
1168                  * lightweight exports are not counted */
1169                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1170                         exp->exp_obd->obd_stale_clients++;
1171         }
1172         spin_unlock(&obd->obd_recovery_task_lock);
1173
1174         spin_lock(&exp->exp_lock);
1175         /** Cleanup req replay fields */
1176         if (exp->exp_req_replay_needed) {
1177                 exp->exp_req_replay_needed = 0;
1178
1179                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1180                 atomic_dec(&obd->obd_req_replay_clients);
1181         }
1182
1183         /** Cleanup lock replay data */
1184         if (exp->exp_lock_replay_needed) {
1185                 exp->exp_lock_replay_needed = 0;
1186
1187                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1188                 atomic_dec(&obd->obd_lock_replay_clients);
1189         }
1190         spin_unlock(&exp->exp_lock);
1191 }
1192
1193 /* This function removes 1-3 references from the export:
1194  * 1 - for export pointer passed
1195  * and if disconnect really need
1196  * 2 - removing from hash
1197  * 3 - in client_unlink_export
1198  * The export pointer passed to this function can destroyed */
1199 int class_disconnect(struct obd_export *export)
1200 {
1201         int already_disconnected;
1202         ENTRY;
1203
1204         if (export == NULL) {
1205                 CWARN("attempting to free NULL export %p\n", export);
1206                 RETURN(-EINVAL);
1207         }
1208
1209         spin_lock(&export->exp_lock);
1210         already_disconnected = export->exp_disconnected;
1211         export->exp_disconnected = 1;
1212         spin_unlock(&export->exp_lock);
1213
1214         /* class_cleanup(), abort_recovery(), and class_fail_export()
1215          * all end up in here, and if any of them race we shouldn't
1216          * call extra class_export_puts(). */
1217         if (already_disconnected) {
1218                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1219                 GOTO(no_disconn, already_disconnected);
1220         }
1221
1222         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1223                export->exp_handle.h_cookie);
1224
1225         if (!hlist_unhashed(&export->exp_nid_hash))
1226                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1227                              &export->exp_connection->c_peer.nid,
1228                              &export->exp_nid_hash);
1229
1230         class_export_recovery_cleanup(export);
1231         class_unlink_export(export);
1232 no_disconn:
1233         class_export_put(export);
1234         RETURN(0);
1235 }
1236 EXPORT_SYMBOL(class_disconnect);
1237
1238 /* Return non-zero for a fully connected export */
1239 int class_connected_export(struct obd_export *exp)
1240 {
1241         int connected = 0;
1242
1243         if (exp) {
1244                 spin_lock(&exp->exp_lock);
1245                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1246                 spin_unlock(&exp->exp_lock);
1247         }
1248         return connected;
1249 }
1250 EXPORT_SYMBOL(class_connected_export);
1251
1252 static void class_disconnect_export_list(struct list_head *list,
1253                                          enum obd_option flags)
1254 {
1255         int rc;
1256         struct obd_export *exp;
1257         ENTRY;
1258
1259         /* It's possible that an export may disconnect itself, but
1260          * nothing else will be added to this list. */
1261         while (!list_empty(list)) {
1262                 exp = list_entry(list->next, struct obd_export,
1263                                  exp_obd_chain);
1264                 /* need for safe call CDEBUG after obd_disconnect */
1265                 class_export_get(exp);
1266
1267                 spin_lock(&exp->exp_lock);
1268                 exp->exp_flags = flags;
1269                 spin_unlock(&exp->exp_lock);
1270
1271                 if (obd_uuid_equals(&exp->exp_client_uuid,
1272                                     &exp->exp_obd->obd_uuid)) {
1273                         CDEBUG(D_HA,
1274                                "exp %p export uuid == obd uuid, don't discon\n",
1275                                exp);
1276                         /* Need to delete this now so we don't end up pointing
1277                          * to work_list later when this export is cleaned up. */
1278                         list_del_init(&exp->exp_obd_chain);
1279                         class_export_put(exp);
1280                         continue;
1281                 }
1282
1283                 class_export_get(exp);
1284                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1285                        "last request at "CFS_TIME_T"\n",
1286                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1287                        exp, exp->exp_last_request_time);
1288                 /* release one export reference anyway */
1289                 rc = obd_disconnect(exp);
1290
1291                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1292                        obd_export_nid2str(exp), exp, rc);
1293                 class_export_put(exp);
1294         }
1295         EXIT;
1296 }
1297
1298 void class_disconnect_exports(struct obd_device *obd)
1299 {
1300         struct list_head work_list;
1301         ENTRY;
1302
1303         /* Move all of the exports from obd_exports to a work list, en masse. */
1304         INIT_LIST_HEAD(&work_list);
1305         spin_lock(&obd->obd_dev_lock);
1306         list_splice_init(&obd->obd_exports, &work_list);
1307         list_splice_init(&obd->obd_delayed_exports, &work_list);
1308         spin_unlock(&obd->obd_dev_lock);
1309
1310         if (!list_empty(&work_list)) {
1311                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1312                        "disconnecting them\n", obd->obd_minor, obd);
1313                 class_disconnect_export_list(&work_list,
1314                                              exp_flags_from_obd(obd));
1315         } else
1316                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1317                        obd->obd_minor, obd);
1318         EXIT;
1319 }
1320 EXPORT_SYMBOL(class_disconnect_exports);
1321
1322 /* Remove exports that have not completed recovery.
1323  */
1324 void class_disconnect_stale_exports(struct obd_device *obd,
1325                                     int (*test_export)(struct obd_export *))
1326 {
1327         struct list_head work_list;
1328         struct obd_export *exp, *n;
1329         int evicted = 0;
1330         ENTRY;
1331
1332         INIT_LIST_HEAD(&work_list);
1333         spin_lock(&obd->obd_dev_lock);
1334         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1335                                  exp_obd_chain) {
1336                 /* don't count self-export as client */
1337                 if (obd_uuid_equals(&exp->exp_client_uuid,
1338                                     &exp->exp_obd->obd_uuid))
1339                         continue;
1340
1341                 /* don't evict clients which have no slot in last_rcvd
1342                  * (e.g. lightweight connection) */
1343                 if (exp->exp_target_data.ted_lr_idx == -1)
1344                         continue;
1345
1346                 spin_lock(&exp->exp_lock);
1347                 if (exp->exp_failed || test_export(exp)) {
1348                         spin_unlock(&exp->exp_lock);
1349                         continue;
1350                 }
1351                 exp->exp_failed = 1;
1352                 spin_unlock(&exp->exp_lock);
1353
1354                 list_move(&exp->exp_obd_chain, &work_list);
1355                 evicted++;
1356                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1357                        obd->obd_name, exp->exp_client_uuid.uuid,
1358                        exp->exp_connection == NULL ? "<unknown>" :
1359                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1360                 print_export_data(exp, "EVICTING", 0);
1361         }
1362         spin_unlock(&obd->obd_dev_lock);
1363
1364         if (evicted)
1365                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1366                               obd->obd_name, evicted);
1367
1368         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1369                                                  OBD_OPT_ABORT_RECOV);
1370         EXIT;
1371 }
1372 EXPORT_SYMBOL(class_disconnect_stale_exports);
1373
1374 void class_fail_export(struct obd_export *exp)
1375 {
1376         int rc, already_failed;
1377
1378         spin_lock(&exp->exp_lock);
1379         already_failed = exp->exp_failed;
1380         exp->exp_failed = 1;
1381         spin_unlock(&exp->exp_lock);
1382
1383         if (already_failed) {
1384                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1385                        exp, exp->exp_client_uuid.uuid);
1386                 return;
1387         }
1388
1389         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1390                exp, exp->exp_client_uuid.uuid);
1391
1392         if (obd_dump_on_timeout)
1393                 libcfs_debug_dumplog();
1394
1395         /* need for safe call CDEBUG after obd_disconnect */
1396         class_export_get(exp);
1397
1398         /* Most callers into obd_disconnect are removing their own reference
1399          * (request, for example) in addition to the one from the hash table.
1400          * We don't have such a reference here, so make one. */
1401         class_export_get(exp);
1402         rc = obd_disconnect(exp);
1403         if (rc)
1404                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1405         else
1406                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1407                        exp, exp->exp_client_uuid.uuid);
1408         class_export_put(exp);
1409 }
1410 EXPORT_SYMBOL(class_fail_export);
1411
1412 char *obd_export_nid2str(struct obd_export *exp)
1413 {
1414         if (exp->exp_connection != NULL)
1415                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1416
1417         return "(no nid)";
1418 }
1419 EXPORT_SYMBOL(obd_export_nid2str);
1420
1421 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1422 {
1423         cfs_hash_t *nid_hash;
1424         struct obd_export *doomed_exp = NULL;
1425         int exports_evicted = 0;
1426
1427         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1428
1429         spin_lock(&obd->obd_dev_lock);
1430         /* umount has run already, so evict thread should leave
1431          * its task to umount thread now */
1432         if (obd->obd_stopping) {
1433                 spin_unlock(&obd->obd_dev_lock);
1434                 return exports_evicted;
1435         }
1436         nid_hash = obd->obd_nid_hash;
1437         cfs_hash_getref(nid_hash);
1438         spin_unlock(&obd->obd_dev_lock);
1439
1440         do {
1441                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1442                 if (doomed_exp == NULL)
1443                         break;
1444
1445                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1446                          "nid %s found, wanted nid %s, requested nid %s\n",
1447                          obd_export_nid2str(doomed_exp),
1448                          libcfs_nid2str(nid_key), nid);
1449                 LASSERTF(doomed_exp != obd->obd_self_export,
1450                          "self-export is hashed by NID?\n");
1451                 exports_evicted++;
1452                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1453                               "request\n", obd->obd_name,
1454                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1455                               obd_export_nid2str(doomed_exp));
1456                 class_fail_export(doomed_exp);
1457                 class_export_put(doomed_exp);
1458         } while (1);
1459
1460         cfs_hash_putref(nid_hash);
1461
1462         if (!exports_evicted)
1463                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1464                        obd->obd_name, nid);
1465         return exports_evicted;
1466 }
1467 EXPORT_SYMBOL(obd_export_evict_by_nid);
1468
1469 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1470 {
1471         cfs_hash_t *uuid_hash;
1472         struct obd_export *doomed_exp = NULL;
1473         struct obd_uuid doomed_uuid;
1474         int exports_evicted = 0;
1475
1476         spin_lock(&obd->obd_dev_lock);
1477         if (obd->obd_stopping) {
1478                 spin_unlock(&obd->obd_dev_lock);
1479                 return exports_evicted;
1480         }
1481         uuid_hash = obd->obd_uuid_hash;
1482         cfs_hash_getref(uuid_hash);
1483         spin_unlock(&obd->obd_dev_lock);
1484
1485         obd_str2uuid(&doomed_uuid, uuid);
1486         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1487                 CERROR("%s: can't evict myself\n", obd->obd_name);
1488                 cfs_hash_putref(uuid_hash);
1489                 return exports_evicted;
1490         }
1491
1492         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1493
1494         if (doomed_exp == NULL) {
1495                 CERROR("%s: can't disconnect %s: no exports found\n",
1496                        obd->obd_name, uuid);
1497         } else {
1498                 CWARN("%s: evicting %s at adminstrative request\n",
1499                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1500                 class_fail_export(doomed_exp);
1501                 class_export_put(doomed_exp);
1502                 exports_evicted++;
1503         }
1504         cfs_hash_putref(uuid_hash);
1505
1506         return exports_evicted;
1507 }
1508
1509 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1510 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1511 #endif
1512
1513 static void print_export_data(struct obd_export *exp, const char *status,
1514                               int locks)
1515 {
1516         struct ptlrpc_reply_state *rs;
1517         struct ptlrpc_reply_state *first_reply = NULL;
1518         int nreplies = 0;
1519
1520         spin_lock(&exp->exp_lock);
1521         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1522                             rs_exp_list) {
1523                 if (nreplies == 0)
1524                         first_reply = rs;
1525                 nreplies++;
1526         }
1527         spin_unlock(&exp->exp_lock);
1528
1529         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1530                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1531                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1532                atomic_read(&exp->exp_rpc_count),
1533                atomic_read(&exp->exp_cb_count),
1534                atomic_read(&exp->exp_locks_count),
1535                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1536                nreplies, first_reply, nreplies > 3 ? "..." : "",
1537                exp->exp_last_committed);
1538 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1539         if (locks && class_export_dump_hook != NULL)
1540                 class_export_dump_hook(exp);
1541 #endif
1542 }
1543
1544 void dump_exports(struct obd_device *obd, int locks)
1545 {
1546         struct obd_export *exp;
1547
1548         spin_lock(&obd->obd_dev_lock);
1549         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1550                 print_export_data(exp, "ACTIVE", locks);
1551         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1552                 print_export_data(exp, "UNLINKED", locks);
1553         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1554                 print_export_data(exp, "DELAYED", locks);
1555         spin_unlock(&obd->obd_dev_lock);
1556         spin_lock(&obd_zombie_impexp_lock);
1557         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1558                 print_export_data(exp, "ZOMBIE", locks);
1559         spin_unlock(&obd_zombie_impexp_lock);
1560 }
1561
1562 void obd_exports_barrier(struct obd_device *obd)
1563 {
1564         int waited = 2;
1565         LASSERT(list_empty(&obd->obd_exports));
1566         spin_lock(&obd->obd_dev_lock);
1567         while (!list_empty(&obd->obd_unlinked_exports)) {
1568                 spin_unlock(&obd->obd_dev_lock);
1569                 set_current_state(TASK_UNINTERRUPTIBLE);
1570                 schedule_timeout(cfs_time_seconds(waited));
1571                 if (waited > 5 && IS_PO2(waited)) {
1572                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1573                                       "more than %d seconds. "
1574                                       "The obd refcount = %d. Is it stuck?\n",
1575                                       obd->obd_name, waited,
1576                                       atomic_read(&obd->obd_refcount));
1577                         dump_exports(obd, 1);
1578                 }
1579                 waited *= 2;
1580                 spin_lock(&obd->obd_dev_lock);
1581         }
1582         spin_unlock(&obd->obd_dev_lock);
1583 }
1584 EXPORT_SYMBOL(obd_exports_barrier);
1585
1586 /* Total amount of zombies to be destroyed */
1587 static int zombies_count = 0;
1588
1589 /**
1590  * kill zombie imports and exports
1591  */
1592 void obd_zombie_impexp_cull(void)
1593 {
1594         struct obd_import *import;
1595         struct obd_export *export;
1596         ENTRY;
1597
1598         do {
1599                 spin_lock(&obd_zombie_impexp_lock);
1600
1601                 import = NULL;
1602                 if (!list_empty(&obd_zombie_imports)) {
1603                         import = list_entry(obd_zombie_imports.next,
1604                                             struct obd_import,
1605                                             imp_zombie_chain);
1606                         list_del_init(&import->imp_zombie_chain);
1607                 }
1608
1609                 export = NULL;
1610                 if (!list_empty(&obd_zombie_exports)) {
1611                         export = list_entry(obd_zombie_exports.next,
1612                                             struct obd_export,
1613                                             exp_obd_chain);
1614                         list_del_init(&export->exp_obd_chain);
1615                 }
1616
1617                 spin_unlock(&obd_zombie_impexp_lock);
1618
1619                 if (import != NULL) {
1620                         class_import_destroy(import);
1621                         spin_lock(&obd_zombie_impexp_lock);
1622                         zombies_count--;
1623                         spin_unlock(&obd_zombie_impexp_lock);
1624                 }
1625
1626                 if (export != NULL) {
1627                         class_export_destroy(export);
1628                         spin_lock(&obd_zombie_impexp_lock);
1629                         zombies_count--;
1630                         spin_unlock(&obd_zombie_impexp_lock);
1631                 }
1632
1633                 cond_resched();
1634         } while (import != NULL || export != NULL);
1635         EXIT;
1636 }
1637
1638 static struct completion        obd_zombie_start;
1639 static struct completion        obd_zombie_stop;
1640 static unsigned long            obd_zombie_flags;
1641 static wait_queue_head_t        obd_zombie_waitq;
1642 static pid_t                    obd_zombie_pid;
1643
1644 enum {
1645         OBD_ZOMBIE_STOP         = 0x0001,
1646 };
1647
1648 /**
1649  * check for work for kill zombie import/export thread.
1650  */
1651 static int obd_zombie_impexp_check(void *arg)
1652 {
1653         int rc;
1654
1655         spin_lock(&obd_zombie_impexp_lock);
1656         rc = (zombies_count == 0) &&
1657              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1658         spin_unlock(&obd_zombie_impexp_lock);
1659
1660         RETURN(rc);
1661 }
1662
1663 /**
1664  * Add export to the obd_zombe thread and notify it.
1665  */
1666 static void obd_zombie_export_add(struct obd_export *exp) {
1667         spin_lock(&exp->exp_obd->obd_dev_lock);
1668         LASSERT(!list_empty(&exp->exp_obd_chain));
1669         list_del_init(&exp->exp_obd_chain);
1670         spin_unlock(&exp->exp_obd->obd_dev_lock);
1671         spin_lock(&obd_zombie_impexp_lock);
1672         zombies_count++;
1673         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1674         spin_unlock(&obd_zombie_impexp_lock);
1675
1676         obd_zombie_impexp_notify();
1677 }
1678
1679 /**
1680  * Add import to the obd_zombe thread and notify it.
1681  */
1682 static void obd_zombie_import_add(struct obd_import *imp) {
1683         LASSERT(imp->imp_sec == NULL);
1684         LASSERT(imp->imp_rq_pool == NULL);
1685         spin_lock(&obd_zombie_impexp_lock);
1686         LASSERT(list_empty(&imp->imp_zombie_chain));
1687         zombies_count++;
1688         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1689         spin_unlock(&obd_zombie_impexp_lock);
1690
1691         obd_zombie_impexp_notify();
1692 }
1693
1694 /**
1695  * notify import/export destroy thread about new zombie.
1696  */
1697 static void obd_zombie_impexp_notify(void)
1698 {
1699         /*
1700          * Make sure obd_zomebie_impexp_thread get this notification.
1701          * It is possible this signal only get by obd_zombie_barrier, and
1702          * barrier gulps this notification and sleeps away and hangs ensues
1703          */
1704         wake_up_all(&obd_zombie_waitq);
1705 }
1706
1707 /**
1708  * check whether obd_zombie is idle
1709  */
1710 static int obd_zombie_is_idle(void)
1711 {
1712         int rc;
1713
1714         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1715         spin_lock(&obd_zombie_impexp_lock);
1716         rc = (zombies_count == 0);
1717         spin_unlock(&obd_zombie_impexp_lock);
1718         return rc;
1719 }
1720
1721 /**
1722  * wait when obd_zombie import/export queues become empty
1723  */
1724 void obd_zombie_barrier(void)
1725 {
1726         struct l_wait_info lwi = { 0 };
1727
1728         if (obd_zombie_pid == current_pid())
1729                 /* don't wait for myself */
1730                 return;
1731         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1732 }
1733 EXPORT_SYMBOL(obd_zombie_barrier);
1734
1735
1736 /**
1737  * destroy zombie export/import thread.
1738  */
1739 static int obd_zombie_impexp_thread(void *unused)
1740 {
1741         unshare_fs_struct();
1742         complete(&obd_zombie_start);
1743
1744         obd_zombie_pid = current_pid();
1745
1746         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1747                 struct l_wait_info lwi = { 0 };
1748
1749                 l_wait_event(obd_zombie_waitq,
1750                              !obd_zombie_impexp_check(NULL), &lwi);
1751                 obd_zombie_impexp_cull();
1752
1753                 /*
1754                  * Notify obd_zombie_barrier callers that queues
1755                  * may be empty.
1756                  */
1757                 wake_up(&obd_zombie_waitq);
1758         }
1759
1760         complete(&obd_zombie_stop);
1761
1762         RETURN(0);
1763 }
1764
1765
1766 /**
1767  * start destroy zombie import/export thread
1768  */
1769 int obd_zombie_impexp_init(void)
1770 {
1771         struct task_struct *task;
1772
1773         INIT_LIST_HEAD(&obd_zombie_imports);
1774
1775         INIT_LIST_HEAD(&obd_zombie_exports);
1776         spin_lock_init(&obd_zombie_impexp_lock);
1777         init_completion(&obd_zombie_start);
1778         init_completion(&obd_zombie_stop);
1779         init_waitqueue_head(&obd_zombie_waitq);
1780         obd_zombie_pid = 0;
1781
1782         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1783         if (IS_ERR(task))
1784                 RETURN(PTR_ERR(task));
1785
1786         wait_for_completion(&obd_zombie_start);
1787         RETURN(0);
1788 }
1789 /**
1790  * stop destroy zombie import/export thread
1791  */
1792 void obd_zombie_impexp_stop(void)
1793 {
1794         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1795         obd_zombie_impexp_notify();
1796         wait_for_completion(&obd_zombie_stop);
1797 }
1798
1799 /***** Kernel-userspace comm helpers *******/
1800
1801 /* Get length of entire message, including header */
1802 int kuc_len(int payload_len)
1803 {
1804         return sizeof(struct kuc_hdr) + payload_len;
1805 }
1806 EXPORT_SYMBOL(kuc_len);
1807
1808 /* Get a pointer to kuc header, given a ptr to the payload
1809  * @param p Pointer to payload area
1810  * @returns Pointer to kuc header
1811  */
1812 struct kuc_hdr * kuc_ptr(void *p)
1813 {
1814         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1815         LASSERT(lh->kuc_magic == KUC_MAGIC);
1816         return lh;
1817 }
1818 EXPORT_SYMBOL(kuc_ptr);
1819
1820 /* Test if payload is part of kuc message
1821  * @param p Pointer to payload area
1822  * @returns boolean
1823  */
1824 int kuc_ispayload(void *p)
1825 {
1826         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1827
1828         if (kh->kuc_magic == KUC_MAGIC)
1829                 return 1;
1830         else
1831                 return 0;
1832 }
1833 EXPORT_SYMBOL(kuc_ispayload);
1834
1835 /* Alloc space for a message, and fill in header
1836  * @return Pointer to payload area
1837  */
1838 void *kuc_alloc(int payload_len, int transport, int type)
1839 {
1840         struct kuc_hdr *lh;
1841         int len = kuc_len(payload_len);
1842
1843         OBD_ALLOC(lh, len);
1844         if (lh == NULL)
1845                 return ERR_PTR(-ENOMEM);
1846
1847         lh->kuc_magic = KUC_MAGIC;
1848         lh->kuc_transport = transport;
1849         lh->kuc_msgtype = type;
1850         lh->kuc_msglen = len;
1851
1852         return (void *)(lh + 1);
1853 }
1854 EXPORT_SYMBOL(kuc_alloc);
1855
1856 /* Takes pointer to payload area */
1857 inline void kuc_free(void *p, int payload_len)
1858 {
1859         struct kuc_hdr *lh = kuc_ptr(p);
1860         OBD_FREE(lh, kuc_len(payload_len));
1861 }
1862 EXPORT_SYMBOL(kuc_free);
1863
1864 struct obd_request_slot_waiter {
1865         struct list_head        orsw_entry;
1866         wait_queue_head_t       orsw_waitq;
1867         bool                    orsw_signaled;
1868 };
1869
1870 static bool obd_request_slot_avail(struct client_obd *cli,
1871                                    struct obd_request_slot_waiter *orsw)
1872 {
1873         bool avail;
1874
1875         spin_lock(&cli->cl_loi_list_lock);
1876         avail = !!list_empty(&orsw->orsw_entry);
1877         spin_unlock(&cli->cl_loi_list_lock);
1878
1879         return avail;
1880 };
1881
1882 /*
1883  * For network flow control, the RPC sponsor needs to acquire a credit
1884  * before sending the RPC. The credits count for a connection is defined
1885  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1886  * the subsequent RPC sponsors need to wait until others released their
1887  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1888  */
1889 int obd_get_request_slot(struct client_obd *cli)
1890 {
1891         struct obd_request_slot_waiter   orsw;
1892         struct l_wait_info               lwi;
1893         int                              rc;
1894
1895         spin_lock(&cli->cl_loi_list_lock);
1896         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1897                 cli->cl_r_in_flight++;
1898                 spin_unlock(&cli->cl_loi_list_lock);
1899                 return 0;
1900         }
1901
1902         init_waitqueue_head(&orsw.orsw_waitq);
1903         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1904         orsw.orsw_signaled = false;
1905         spin_unlock(&cli->cl_loi_list_lock);
1906
1907         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1908         rc = l_wait_event(orsw.orsw_waitq,
1909                           obd_request_slot_avail(cli, &orsw) ||
1910                           orsw.orsw_signaled,
1911                           &lwi);
1912
1913         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1914          * freed but other (such as obd_put_request_slot) is using it. */
1915         spin_lock(&cli->cl_loi_list_lock);
1916         if (rc != 0) {
1917                 if (!orsw.orsw_signaled) {
1918                         if (list_empty(&orsw.orsw_entry))
1919                                 cli->cl_r_in_flight--;
1920                         else
1921                                 list_del(&orsw.orsw_entry);
1922                 }
1923         }
1924
1925         if (orsw.orsw_signaled) {
1926                 LASSERT(list_empty(&orsw.orsw_entry));
1927
1928                 rc = -EINTR;
1929         }
1930         spin_unlock(&cli->cl_loi_list_lock);
1931
1932         return rc;
1933 }
1934 EXPORT_SYMBOL(obd_get_request_slot);
1935
1936 void obd_put_request_slot(struct client_obd *cli)
1937 {
1938         struct obd_request_slot_waiter *orsw;
1939
1940         spin_lock(&cli->cl_loi_list_lock);
1941         cli->cl_r_in_flight--;
1942
1943         /* If there is free slot, wakeup the first waiter. */
1944         if (!list_empty(&cli->cl_loi_read_list) &&
1945             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1946                 orsw = list_entry(cli->cl_loi_read_list.next,
1947                                   struct obd_request_slot_waiter, orsw_entry);
1948                 list_del_init(&orsw->orsw_entry);
1949                 cli->cl_r_in_flight++;
1950                 wake_up(&orsw->orsw_waitq);
1951         }
1952         spin_unlock(&cli->cl_loi_list_lock);
1953 }
1954 EXPORT_SYMBOL(obd_put_request_slot);
1955
1956 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1957 {
1958         return cli->cl_max_rpcs_in_flight;
1959 }
1960 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1961
1962 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1963 {
1964         struct obd_request_slot_waiter *orsw;
1965         __u32                           old;
1966         int                             diff;
1967         int                             i;
1968
1969         if (max > OBD_MAX_RIF_MAX || max < 1)
1970                 return -ERANGE;
1971
1972         spin_lock(&cli->cl_loi_list_lock);
1973         old = cli->cl_max_rpcs_in_flight;
1974         cli->cl_max_rpcs_in_flight = max;
1975         diff = max - old;
1976
1977         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1978         for (i = 0; i < diff; i++) {
1979                 if (list_empty(&cli->cl_loi_read_list))
1980                         break;
1981
1982                 orsw = list_entry(cli->cl_loi_read_list.next,
1983                                   struct obd_request_slot_waiter, orsw_entry);
1984                 list_del_init(&orsw->orsw_entry);
1985                 cli->cl_r_in_flight++;
1986                 wake_up(&orsw->orsw_waitq);
1987         }
1988         spin_unlock(&cli->cl_loi_list_lock);
1989
1990         return 0;
1991 }
1992 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);