Whamcloud - gitweb
LU-5829 obdclass: remove unnecessary EXPORT_SYMBOL
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
45
46 spinlock_t obd_types_lock;
47
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t  obd_zombie_impexp_lock;
56
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks);
62
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80
81 static void obd_device_free(struct obd_device *obd)
82 {
83         LASSERT(obd != NULL);
84         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86         if (obd->obd_namespace != NULL) {
87                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88                        obd, obd->obd_namespace, obd->obd_force);
89                 LBUG();
90         }
91         lu_ref_fini(&obd->obd_reference);
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94
95 struct obd_type *class_search_type(const char *name)
96 {
97         struct list_head *tmp;
98         struct obd_type *type;
99
100         spin_lock(&obd_types_lock);
101         list_for_each(tmp, &obd_types) {
102                 type = list_entry(tmp, struct obd_type, typ_chain);
103                 if (strcmp(type->typ_name, name) == 0) {
104                         spin_unlock(&obd_types_lock);
105                         return type;
106                 }
107         }
108         spin_unlock(&obd_types_lock);
109         return NULL;
110 }
111 EXPORT_SYMBOL(class_search_type);
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef HAVE_MODULE_LOADING_SUPPORT
118         if (!type) {
119                 const char *modname = name;
120
121                 if (strcmp(modname, "obdfilter") == 0)
122                         modname = "ofd";
123
124                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125                         modname = LUSTRE_OSP_NAME;
126
127                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128                         modname = LUSTRE_MDT_NAME;
129
130                 if (!request_module("%s", modname)) {
131                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132                         type = class_search_type(name);
133                 } else {
134                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135                                            modname);
136                 }
137         }
138 #endif
139         if (type) {
140                 spin_lock(&type->obd_type_lock);
141                 type->typ_refcnt++;
142                 try_module_get(type->typ_dt_ops->o_owner);
143                 spin_unlock(&type->obd_type_lock);
144         }
145         return type;
146 }
147
148 void class_put_type(struct obd_type *type)
149 {
150         LASSERT(type);
151         spin_lock(&type->obd_type_lock);
152         type->typ_refcnt--;
153         module_put(type->typ_dt_ops->o_owner);
154         spin_unlock(&type->obd_type_lock);
155 }
156
157 #define CLASS_MAX_NAME 1024
158
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160                         bool enable_proc, struct lprocfs_vars *vars,
161                         const char *name, struct lu_device_type *ldt)
162 {
163         struct obd_type *type;
164         int rc = 0;
165         ENTRY;
166
167         /* sanity check */
168         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
169
170         if (class_search_type(name)) {
171                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172                 RETURN(-EEXIST);
173         }
174
175         rc = -ENOMEM;
176         OBD_ALLOC(type, sizeof(*type));
177         if (type == NULL)
178                 RETURN(rc);
179
180         OBD_ALLOC_PTR(type->typ_dt_ops);
181         OBD_ALLOC_PTR(type->typ_md_ops);
182         OBD_ALLOC(type->typ_name, strlen(name) + 1);
183
184         if (type->typ_dt_ops == NULL ||
185             type->typ_md_ops == NULL ||
186             type->typ_name == NULL)
187                 GOTO (failed, rc);
188
189         *(type->typ_dt_ops) = *dt_ops;
190         /* md_ops is optional */
191         if (md_ops)
192                 *(type->typ_md_ops) = *md_ops;
193         strcpy(type->typ_name, name);
194         spin_lock_init(&type->obd_type_lock);
195
196 #ifdef CONFIG_PROC_FS
197         if (enable_proc) {
198                 type->typ_procroot = lprocfs_register(type->typ_name,
199                                                       proc_lustre_root,
200                                                       vars, type);
201                 if (IS_ERR(type->typ_procroot)) {
202                         rc = PTR_ERR(type->typ_procroot);
203                         type->typ_procroot = NULL;
204                         GOTO(failed, rc);
205                 }
206         }
207 #endif
208         if (ldt != NULL) {
209                 type->typ_lu = ldt;
210                 rc = lu_device_type_init(ldt);
211                 if (rc != 0)
212                         GOTO (failed, rc);
213         }
214
215         spin_lock(&obd_types_lock);
216         list_add(&type->typ_chain, &obd_types);
217         spin_unlock(&obd_types_lock);
218
219         RETURN (0);
220
221 failed:
222         if (type->typ_name != NULL) {
223 #ifdef CONFIG_PROC_FS
224                 if (type->typ_procroot != NULL)
225                         remove_proc_subtree(type->typ_name, proc_lustre_root);
226 #endif
227                 OBD_FREE(type->typ_name, strlen(name) + 1);
228         }
229         if (type->typ_md_ops != NULL)
230                 OBD_FREE_PTR(type->typ_md_ops);
231         if (type->typ_dt_ops != NULL)
232                 OBD_FREE_PTR(type->typ_dt_ops);
233         OBD_FREE(type, sizeof(*type));
234         RETURN(rc);
235 }
236 EXPORT_SYMBOL(class_register_type);
237
238 int class_unregister_type(const char *name)
239 {
240         struct obd_type *type = class_search_type(name);
241         ENTRY;
242
243         if (!type) {
244                 CERROR("unknown obd type\n");
245                 RETURN(-EINVAL);
246         }
247
248         if (type->typ_refcnt) {
249                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
250                 /* This is a bad situation, let's make the best of it */
251                 /* Remove ops, but leave the name for debugging */
252                 OBD_FREE_PTR(type->typ_dt_ops);
253                 OBD_FREE_PTR(type->typ_md_ops);
254                 RETURN(-EBUSY);
255         }
256
257         /* we do not use type->typ_procroot as for compatibility purposes
258          * other modules can share names (i.e. lod can use lov entry). so
259          * we can't reference pointer as it can get invalided when another
260          * module removes the entry */
261 #ifdef CONFIG_PROC_FS
262         if (type->typ_procroot != NULL)
263                 remove_proc_subtree(type->typ_name, proc_lustre_root);
264         if (type->typ_procsym != NULL)
265                 lprocfs_remove(&type->typ_procsym);
266 #endif
267         if (type->typ_lu)
268                 lu_device_type_fini(type->typ_lu);
269
270         spin_lock(&obd_types_lock);
271         list_del(&type->typ_chain);
272         spin_unlock(&obd_types_lock);
273         OBD_FREE(type->typ_name, strlen(name) + 1);
274         if (type->typ_dt_ops != NULL)
275                 OBD_FREE_PTR(type->typ_dt_ops);
276         if (type->typ_md_ops != NULL)
277                 OBD_FREE_PTR(type->typ_md_ops);
278         OBD_FREE(type, sizeof(*type));
279         RETURN(0);
280 } /* class_unregister_type */
281 EXPORT_SYMBOL(class_unregister_type);
282
283 /**
284  * Create a new obd device.
285  *
286  * Find an empty slot in ::obd_devs[], create a new obd device in it.
287  *
288  * \param[in] type_name obd device type string.
289  * \param[in] name      obd device name.
290  *
291  * \retval NULL if create fails, otherwise return the obd device
292  *         pointer created.
293  */
294 struct obd_device *class_newdev(const char *type_name, const char *name)
295 {
296         struct obd_device *result = NULL;
297         struct obd_device *newdev;
298         struct obd_type *type = NULL;
299         int i;
300         int new_obd_minor = 0;
301         ENTRY;
302
303         if (strlen(name) >= MAX_OBD_NAME) {
304                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
305                 RETURN(ERR_PTR(-EINVAL));
306         }
307
308         type = class_get_type(type_name);
309         if (type == NULL){
310                 CERROR("OBD: unknown type: %s\n", type_name);
311                 RETURN(ERR_PTR(-ENODEV));
312         }
313
314         newdev = obd_device_alloc();
315         if (newdev == NULL)
316                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
317
318         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
319
320         write_lock(&obd_dev_lock);
321         for (i = 0; i < class_devno_max(); i++) {
322                 struct obd_device *obd = class_num2obd(i);
323
324                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
325                         CERROR("Device %s already exists at %d, won't add\n",
326                                name, i);
327                         if (result) {
328                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
329                                          "%p obd_magic %08x != %08x\n", result,
330                                          result->obd_magic, OBD_DEVICE_MAGIC);
331                                 LASSERTF(result->obd_minor == new_obd_minor,
332                                          "%p obd_minor %d != %d\n", result,
333                                          result->obd_minor, new_obd_minor);
334
335                                 obd_devs[result->obd_minor] = NULL;
336                                 result->obd_name[0]='\0';
337                          }
338                         result = ERR_PTR(-EEXIST);
339                         break;
340                 }
341                 if (!result && !obd) {
342                         result = newdev;
343                         result->obd_minor = i;
344                         new_obd_minor = i;
345                         result->obd_type = type;
346                         strncpy(result->obd_name, name,
347                                 sizeof(result->obd_name) - 1);
348                         obd_devs[i] = result;
349                 }
350         }
351         write_unlock(&obd_dev_lock);
352
353         if (result == NULL && i >= class_devno_max()) {
354                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
355                        class_devno_max());
356                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
357         }
358
359         if (IS_ERR(result))
360                 GOTO(out, result);
361
362         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
363                result->obd_name, result);
364
365         RETURN(result);
366 out:
367         obd_device_free(newdev);
368 out_type:
369         class_put_type(type);
370         return result;
371 }
372
373 void class_release_dev(struct obd_device *obd)
374 {
375         struct obd_type *obd_type = obd->obd_type;
376
377         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
378                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
379         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
380                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
381         LASSERT(obd_type != NULL);
382
383         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
384                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
385
386         write_lock(&obd_dev_lock);
387         obd_devs[obd->obd_minor] = NULL;
388         write_unlock(&obd_dev_lock);
389         obd_device_free(obd);
390
391         class_put_type(obd_type);
392 }
393
394 int class_name2dev(const char *name)
395 {
396         int i;
397
398         if (!name)
399                 return -1;
400
401         read_lock(&obd_dev_lock);
402         for (i = 0; i < class_devno_max(); i++) {
403                 struct obd_device *obd = class_num2obd(i);
404
405                 if (obd && strcmp(name, obd->obd_name) == 0) {
406                         /* Make sure we finished attaching before we give
407                            out any references */
408                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409                         if (obd->obd_attached) {
410                                 read_unlock(&obd_dev_lock);
411                                 return i;
412                         }
413                         break;
414                 }
415         }
416         read_unlock(&obd_dev_lock);
417
418         return -1;
419 }
420
421 struct obd_device *class_name2obd(const char *name)
422 {
423         int dev = class_name2dev(name);
424
425         if (dev < 0 || dev > class_devno_max())
426                 return NULL;
427         return class_num2obd(dev);
428 }
429 EXPORT_SYMBOL(class_name2obd);
430
431 int class_uuid2dev(struct obd_uuid *uuid)
432 {
433         int i;
434
435         read_lock(&obd_dev_lock);
436         for (i = 0; i < class_devno_max(); i++) {
437                 struct obd_device *obd = class_num2obd(i);
438
439                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
440                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
441                         read_unlock(&obd_dev_lock);
442                         return i;
443                 }
444         }
445         read_unlock(&obd_dev_lock);
446
447         return -1;
448 }
449
450 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
451 {
452         int dev = class_uuid2dev(uuid);
453         if (dev < 0)
454                 return NULL;
455         return class_num2obd(dev);
456 }
457 EXPORT_SYMBOL(class_uuid2obd);
458
459 /**
460  * Get obd device from ::obd_devs[]
461  *
462  * \param num [in] array index
463  *
464  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
465  *         otherwise return the obd device there.
466  */
467 struct obd_device *class_num2obd(int num)
468 {
469         struct obd_device *obd = NULL;
470
471         if (num < class_devno_max()) {
472                 obd = obd_devs[num];
473                 if (obd == NULL)
474                         return NULL;
475
476                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
477                          "%p obd_magic %08x != %08x\n",
478                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
479                 LASSERTF(obd->obd_minor == num,
480                          "%p obd_minor %0d != %0d\n",
481                          obd, obd->obd_minor, num);
482         }
483
484         return obd;
485 }
486
487 /**
488  * Get obd devices count. Device in any
489  *    state are counted
490  * \retval obd device count
491  */
492 int get_devices_count(void)
493 {
494         int index, max_index = class_devno_max(), dev_count = 0;
495
496         read_lock(&obd_dev_lock);
497         for (index = 0; index <= max_index; index++) {
498                 struct obd_device *obd = class_num2obd(index);
499                 if (obd != NULL)
500                         dev_count++;
501         }
502         read_unlock(&obd_dev_lock);
503
504         return dev_count;
505 }
506 EXPORT_SYMBOL(get_devices_count);
507
508 void class_obd_list(void)
509 {
510         char *status;
511         int i;
512
513         read_lock(&obd_dev_lock);
514         for (i = 0; i < class_devno_max(); i++) {
515                 struct obd_device *obd = class_num2obd(i);
516
517                 if (obd == NULL)
518                         continue;
519                 if (obd->obd_stopping)
520                         status = "ST";
521                 else if (obd->obd_set_up)
522                         status = "UP";
523                 else if (obd->obd_attached)
524                         status = "AT";
525                 else
526                         status = "--";
527                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
528                          i, status, obd->obd_type->typ_name,
529                          obd->obd_name, obd->obd_uuid.uuid,
530                          atomic_read(&obd->obd_refcount));
531         }
532         read_unlock(&obd_dev_lock);
533         return;
534 }
535
536 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
537    specified, then only the client with that uuid is returned,
538    otherwise any client connected to the tgt is returned. */
539 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
540                                           const char * typ_name,
541                                           struct obd_uuid *grp_uuid)
542 {
543         int i;
544
545         read_lock(&obd_dev_lock);
546         for (i = 0; i < class_devno_max(); i++) {
547                 struct obd_device *obd = class_num2obd(i);
548
549                 if (obd == NULL)
550                         continue;
551                 if ((strncmp(obd->obd_type->typ_name, typ_name,
552                              strlen(typ_name)) == 0)) {
553                         if (obd_uuid_equals(tgt_uuid,
554                                             &obd->u.cli.cl_target_uuid) &&
555                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
556                                                          &obd->obd_uuid) : 1)) {
557                                 read_unlock(&obd_dev_lock);
558                                 return obd;
559                         }
560                 }
561         }
562         read_unlock(&obd_dev_lock);
563
564         return NULL;
565 }
566 EXPORT_SYMBOL(class_find_client_obd);
567
568 /* Iterate the obd_device list looking devices have grp_uuid. Start
569    searching at *next, and if a device is found, the next index to look
570    at is saved in *next. If next is NULL, then the first matching device
571    will always be returned. */
572 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
573 {
574         int i;
575
576         if (next == NULL)
577                 i = 0;
578         else if (*next >= 0 && *next < class_devno_max())
579                 i = *next;
580         else
581                 return NULL;
582
583         read_lock(&obd_dev_lock);
584         for (; i < class_devno_max(); i++) {
585                 struct obd_device *obd = class_num2obd(i);
586
587                 if (obd == NULL)
588                         continue;
589                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
590                         if (next != NULL)
591                                 *next = i+1;
592                         read_unlock(&obd_dev_lock);
593                         return obd;
594                 }
595         }
596         read_unlock(&obd_dev_lock);
597
598         return NULL;
599 }
600 EXPORT_SYMBOL(class_devices_in_group);
601
602 /**
603  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
604  * adjust sptlrpc settings accordingly.
605  */
606 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
607 {
608         struct obd_device  *obd;
609         const char         *type;
610         int                 i, rc = 0, rc2;
611
612         LASSERT(namelen > 0);
613
614         read_lock(&obd_dev_lock);
615         for (i = 0; i < class_devno_max(); i++) {
616                 obd = class_num2obd(i);
617
618                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
619                         continue;
620
621                 /* only notify mdc, osc, mdt, ost */
622                 type = obd->obd_type->typ_name;
623                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
624                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
625                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
626                     strcmp(type, LUSTRE_OST_NAME) != 0)
627                         continue;
628
629                 if (strncmp(obd->obd_name, fsname, namelen))
630                         continue;
631
632                 class_incref(obd, __FUNCTION__, obd);
633                 read_unlock(&obd_dev_lock);
634                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
635                                          sizeof(KEY_SPTLRPC_CONF),
636                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
637                 rc = rc ? rc : rc2;
638                 class_decref(obd, __FUNCTION__, obd);
639                 read_lock(&obd_dev_lock);
640         }
641         read_unlock(&obd_dev_lock);
642         return rc;
643 }
644 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
645
646 void obd_cleanup_caches(void)
647 {
648         ENTRY;
649         if (obd_device_cachep) {
650                 kmem_cache_destroy(obd_device_cachep);
651                 obd_device_cachep = NULL;
652         }
653         if (obdo_cachep) {
654                 kmem_cache_destroy(obdo_cachep);
655                 obdo_cachep = NULL;
656         }
657         if (import_cachep) {
658                 kmem_cache_destroy(import_cachep);
659                 import_cachep = NULL;
660         }
661         if (capa_cachep) {
662                 kmem_cache_destroy(capa_cachep);
663                 capa_cachep = NULL;
664         }
665         EXIT;
666 }
667
668 int obd_init_caches(void)
669 {
670         int rc;
671         ENTRY;
672
673         LASSERT(obd_device_cachep == NULL);
674         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
675                                               sizeof(struct obd_device),
676                                               0, 0, NULL);
677         if (!obd_device_cachep)
678                 GOTO(out, rc = -ENOMEM);
679
680         LASSERT(obdo_cachep == NULL);
681         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
682                                         0, 0, NULL);
683         if (!obdo_cachep)
684                 GOTO(out, rc = -ENOMEM);
685
686         LASSERT(import_cachep == NULL);
687         import_cachep = kmem_cache_create("ll_import_cache",
688                                           sizeof(struct obd_import),
689                                           0, 0, NULL);
690         if (!import_cachep)
691                 GOTO(out, rc = -ENOMEM);
692
693         LASSERT(capa_cachep == NULL);
694         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
695                                         0, 0, NULL);
696         if (!capa_cachep)
697                 GOTO(out, rc = -ENOMEM);
698
699         RETURN(0);
700 out:
701         obd_cleanup_caches();
702         RETURN(rc);
703 }
704
705 /* map connection to client */
706 struct obd_export *class_conn2export(struct lustre_handle *conn)
707 {
708         struct obd_export *export;
709         ENTRY;
710
711         if (!conn) {
712                 CDEBUG(D_CACHE, "looking for null handle\n");
713                 RETURN(NULL);
714         }
715
716         if (conn->cookie == -1) {  /* this means assign a new connection */
717                 CDEBUG(D_CACHE, "want a new connection\n");
718                 RETURN(NULL);
719         }
720
721         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
722         export = class_handle2object(conn->cookie, NULL);
723         RETURN(export);
724 }
725 EXPORT_SYMBOL(class_conn2export);
726
727 struct obd_device *class_exp2obd(struct obd_export *exp)
728 {
729         if (exp)
730                 return exp->exp_obd;
731         return NULL;
732 }
733 EXPORT_SYMBOL(class_exp2obd);
734
735 struct obd_device *class_conn2obd(struct lustre_handle *conn)
736 {
737         struct obd_export *export;
738         export = class_conn2export(conn);
739         if (export) {
740                 struct obd_device *obd = export->exp_obd;
741                 class_export_put(export);
742                 return obd;
743         }
744         return NULL;
745 }
746
747 struct obd_import *class_exp2cliimp(struct obd_export *exp)
748 {
749         struct obd_device *obd = exp->exp_obd;
750         if (obd == NULL)
751                 return NULL;
752         return obd->u.cli.cl_import;
753 }
754 EXPORT_SYMBOL(class_exp2cliimp);
755
756 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
757 {
758         struct obd_device *obd = class_conn2obd(conn);
759         if (obd == NULL)
760                 return NULL;
761         return obd->u.cli.cl_import;
762 }
763
764 /* Export management functions */
765 static void class_export_destroy(struct obd_export *exp)
766 {
767         struct obd_device *obd = exp->exp_obd;
768         ENTRY;
769
770         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
771         LASSERT(obd != NULL);
772
773         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
774                exp->exp_client_uuid.uuid, obd->obd_name);
775
776         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
777         if (exp->exp_connection)
778                 ptlrpc_put_connection_superhack(exp->exp_connection);
779
780         LASSERT(list_empty(&exp->exp_outstanding_replies));
781         LASSERT(list_empty(&exp->exp_uncommitted_replies));
782         LASSERT(list_empty(&exp->exp_req_replay_queue));
783         LASSERT(list_empty(&exp->exp_hp_rpcs));
784         obd_destroy_export(exp);
785         class_decref(obd, "export", exp);
786
787         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
788         EXIT;
789 }
790
791 static void export_handle_addref(void *export)
792 {
793         class_export_get(export);
794 }
795
796 static struct portals_handle_ops export_handle_ops = {
797         .hop_addref = export_handle_addref,
798         .hop_free   = NULL,
799 };
800
801 struct obd_export *class_export_get(struct obd_export *exp)
802 {
803         atomic_inc(&exp->exp_refcount);
804         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
805                atomic_read(&exp->exp_refcount));
806         return exp;
807 }
808 EXPORT_SYMBOL(class_export_get);
809
810 void class_export_put(struct obd_export *exp)
811 {
812         LASSERT(exp != NULL);
813         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
814         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
815                atomic_read(&exp->exp_refcount) - 1);
816
817         if (atomic_dec_and_test(&exp->exp_refcount)) {
818                 LASSERT(!list_empty(&exp->exp_obd_chain));
819                 CDEBUG(D_IOCTL, "final put %p/%s\n",
820                        exp, exp->exp_client_uuid.uuid);
821
822                 /* release nid stat refererence */
823                 lprocfs_exp_cleanup(exp);
824
825                 obd_zombie_export_add(exp);
826         }
827 }
828 EXPORT_SYMBOL(class_export_put);
829
830 /* Creates a new export, adds it to the hash table, and returns a
831  * pointer to it. The refcount is 2: one for the hash reference, and
832  * one for the pointer returned by this function. */
833 struct obd_export *class_new_export(struct obd_device *obd,
834                                     struct obd_uuid *cluuid)
835 {
836         struct obd_export *export;
837         cfs_hash_t *hash = NULL;
838         int rc = 0;
839         ENTRY;
840
841         OBD_ALLOC_PTR(export);
842         if (!export)
843                 return ERR_PTR(-ENOMEM);
844
845         export->exp_conn_cnt = 0;
846         export->exp_lock_hash = NULL;
847         export->exp_flock_hash = NULL;
848         atomic_set(&export->exp_refcount, 2);
849         atomic_set(&export->exp_rpc_count, 0);
850         atomic_set(&export->exp_cb_count, 0);
851         atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853         INIT_LIST_HEAD(&export->exp_locks_list);
854         spin_lock_init(&export->exp_locks_list_guard);
855 #endif
856         atomic_set(&export->exp_replay_count, 0);
857         export->exp_obd = obd;
858         INIT_LIST_HEAD(&export->exp_outstanding_replies);
859         spin_lock_init(&export->exp_uncommitted_replies_lock);
860         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
861         INIT_LIST_HEAD(&export->exp_req_replay_queue);
862         INIT_LIST_HEAD(&export->exp_handle.h_link);
863         INIT_LIST_HEAD(&export->exp_hp_rpcs);
864         INIT_LIST_HEAD(&export->exp_reg_rpcs);
865         class_handle_hash(&export->exp_handle, &export_handle_ops);
866         export->exp_last_request_time = cfs_time_current_sec();
867         spin_lock_init(&export->exp_lock);
868         spin_lock_init(&export->exp_rpc_lock);
869         INIT_HLIST_NODE(&export->exp_uuid_hash);
870         INIT_HLIST_NODE(&export->exp_nid_hash);
871         spin_lock_init(&export->exp_bl_list_lock);
872         INIT_LIST_HEAD(&export->exp_bl_list);
873
874         export->exp_sp_peer = LUSTRE_SP_ANY;
875         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
876         export->exp_client_uuid = *cluuid;
877         obd_init_export(export);
878
879         spin_lock(&obd->obd_dev_lock);
880         /* shouldn't happen, but might race */
881         if (obd->obd_stopping)
882                 GOTO(exit_unlock, rc = -ENODEV);
883
884         hash = cfs_hash_getref(obd->obd_uuid_hash);
885         if (hash == NULL)
886                 GOTO(exit_unlock, rc = -ENODEV);
887         spin_unlock(&obd->obd_dev_lock);
888
889         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
890                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
891                 if (rc != 0) {
892                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
893                                       obd->obd_name, cluuid->uuid, rc);
894                         GOTO(exit_err, rc = -EALREADY);
895                 }
896         }
897
898         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
899         spin_lock(&obd->obd_dev_lock);
900         if (obd->obd_stopping) {
901                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
902                 GOTO(exit_unlock, rc = -ENODEV);
903         }
904
905         class_incref(obd, "export", export);
906         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
907         list_add_tail(&export->exp_obd_chain_timed,
908                       &export->exp_obd->obd_exports_timed);
909         export->exp_obd->obd_num_exports++;
910         spin_unlock(&obd->obd_dev_lock);
911         cfs_hash_putref(hash);
912         RETURN(export);
913
914 exit_unlock:
915         spin_unlock(&obd->obd_dev_lock);
916 exit_err:
917         if (hash)
918                 cfs_hash_putref(hash);
919         class_handle_unhash(&export->exp_handle);
920         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
921         obd_destroy_export(export);
922         OBD_FREE_PTR(export);
923         return ERR_PTR(rc);
924 }
925 EXPORT_SYMBOL(class_new_export);
926
927 void class_unlink_export(struct obd_export *exp)
928 {
929         class_handle_unhash(&exp->exp_handle);
930
931         spin_lock(&exp->exp_obd->obd_dev_lock);
932         /* delete an uuid-export hashitem from hashtables */
933         if (!hlist_unhashed(&exp->exp_uuid_hash))
934                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
935                              &exp->exp_client_uuid,
936                              &exp->exp_uuid_hash);
937
938         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
939         list_del_init(&exp->exp_obd_chain_timed);
940         exp->exp_obd->obd_num_exports--;
941         spin_unlock(&exp->exp_obd->obd_dev_lock);
942         class_export_put(exp);
943 }
944
945 /* Import management functions */
946 static void class_import_destroy(struct obd_import *imp)
947 {
948         ENTRY;
949
950         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
951                 imp->imp_obd->obd_name);
952
953         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
954
955         ptlrpc_put_connection_superhack(imp->imp_connection);
956
957         while (!list_empty(&imp->imp_conn_list)) {
958                 struct obd_import_conn *imp_conn;
959
960                 imp_conn = list_entry(imp->imp_conn_list.next,
961                                       struct obd_import_conn, oic_item);
962                 list_del_init(&imp_conn->oic_item);
963                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
964                 OBD_FREE(imp_conn, sizeof(*imp_conn));
965         }
966
967         LASSERT(imp->imp_sec == NULL);
968         class_decref(imp->imp_obd, "import", imp);
969         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
970         EXIT;
971 }
972
973 static void import_handle_addref(void *import)
974 {
975         class_import_get(import);
976 }
977
978 static struct portals_handle_ops import_handle_ops = {
979         .hop_addref = import_handle_addref,
980         .hop_free   = NULL,
981 };
982
983 struct obd_import *class_import_get(struct obd_import *import)
984 {
985         atomic_inc(&import->imp_refcount);
986         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
987                atomic_read(&import->imp_refcount),
988                import->imp_obd->obd_name);
989         return import;
990 }
991 EXPORT_SYMBOL(class_import_get);
992
993 void class_import_put(struct obd_import *imp)
994 {
995         ENTRY;
996
997         LASSERT(list_empty(&imp->imp_zombie_chain));
998         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
999
1000         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1001                atomic_read(&imp->imp_refcount) - 1,
1002                imp->imp_obd->obd_name);
1003
1004         if (atomic_dec_and_test(&imp->imp_refcount)) {
1005                 CDEBUG(D_INFO, "final put import %p\n", imp);
1006                 obd_zombie_import_add(imp);
1007         }
1008
1009         /* catch possible import put race */
1010         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1011         EXIT;
1012 }
1013 EXPORT_SYMBOL(class_import_put);
1014
1015 static void init_imp_at(struct imp_at *at) {
1016         int i;
1017         at_init(&at->iat_net_latency, 0, 0);
1018         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1019                 /* max service estimates are tracked on the server side, so
1020                    don't use the AT history here, just use the last reported
1021                    val. (But keep hist for proc histogram, worst_ever) */
1022                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1023                         AT_FLG_NOHIST);
1024         }
1025 }
1026
1027 struct obd_import *class_new_import(struct obd_device *obd)
1028 {
1029         struct obd_import *imp;
1030
1031         OBD_ALLOC(imp, sizeof(*imp));
1032         if (imp == NULL)
1033                 return NULL;
1034
1035         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1036         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1037         INIT_LIST_HEAD(&imp->imp_replay_list);
1038         INIT_LIST_HEAD(&imp->imp_sending_list);
1039         INIT_LIST_HEAD(&imp->imp_delayed_list);
1040         INIT_LIST_HEAD(&imp->imp_committed_list);
1041         imp->imp_replay_cursor = &imp->imp_committed_list;
1042         spin_lock_init(&imp->imp_lock);
1043         imp->imp_last_success_conn = 0;
1044         imp->imp_state = LUSTRE_IMP_NEW;
1045         imp->imp_obd = class_incref(obd, "import", imp);
1046         mutex_init(&imp->imp_sec_mutex);
1047         init_waitqueue_head(&imp->imp_recovery_waitq);
1048
1049         atomic_set(&imp->imp_refcount, 2);
1050         atomic_set(&imp->imp_unregistering, 0);
1051         atomic_set(&imp->imp_inflight, 0);
1052         atomic_set(&imp->imp_replay_inflight, 0);
1053         atomic_set(&imp->imp_inval_count, 0);
1054         INIT_LIST_HEAD(&imp->imp_conn_list);
1055         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1056         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1057         init_imp_at(&imp->imp_at);
1058
1059         /* the default magic is V2, will be used in connect RPC, and
1060          * then adjusted according to the flags in request/reply. */
1061         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1062
1063         return imp;
1064 }
1065 EXPORT_SYMBOL(class_new_import);
1066
1067 void class_destroy_import(struct obd_import *import)
1068 {
1069         LASSERT(import != NULL);
1070         LASSERT(import != LP_POISON);
1071
1072         class_handle_unhash(&import->imp_handle);
1073
1074         spin_lock(&import->imp_lock);
1075         import->imp_generation++;
1076         spin_unlock(&import->imp_lock);
1077         class_import_put(import);
1078 }
1079 EXPORT_SYMBOL(class_destroy_import);
1080
1081 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1082
1083 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1084 {
1085         spin_lock(&exp->exp_locks_list_guard);
1086
1087         LASSERT(lock->l_exp_refs_nr >= 0);
1088
1089         if (lock->l_exp_refs_target != NULL &&
1090             lock->l_exp_refs_target != exp) {
1091                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1092                               exp, lock, lock->l_exp_refs_target);
1093         }
1094         if ((lock->l_exp_refs_nr ++) == 0) {
1095                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1096                 lock->l_exp_refs_target = exp;
1097         }
1098         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1099                lock, exp, lock->l_exp_refs_nr);
1100         spin_unlock(&exp->exp_locks_list_guard);
1101 }
1102
1103 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1104 {
1105         spin_lock(&exp->exp_locks_list_guard);
1106         LASSERT(lock->l_exp_refs_nr > 0);
1107         if (lock->l_exp_refs_target != exp) {
1108                 LCONSOLE_WARN("lock %p, "
1109                               "mismatching export pointers: %p, %p\n",
1110                               lock, lock->l_exp_refs_target, exp);
1111         }
1112         if (-- lock->l_exp_refs_nr == 0) {
1113                 list_del_init(&lock->l_exp_refs_link);
1114                 lock->l_exp_refs_target = NULL;
1115         }
1116         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117                lock, exp, lock->l_exp_refs_nr);
1118         spin_unlock(&exp->exp_locks_list_guard);
1119 }
1120 #endif
1121
1122 /* A connection defines an export context in which preallocation can
1123    be managed. This releases the export pointer reference, and returns
1124    the export handle, so the export refcount is 1 when this function
1125    returns. */
1126 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1127                   struct obd_uuid *cluuid)
1128 {
1129         struct obd_export *export;
1130         LASSERT(conn != NULL);
1131         LASSERT(obd != NULL);
1132         LASSERT(cluuid != NULL);
1133         ENTRY;
1134
1135         export = class_new_export(obd, cluuid);
1136         if (IS_ERR(export))
1137                 RETURN(PTR_ERR(export));
1138
1139         conn->cookie = export->exp_handle.h_cookie;
1140         class_export_put(export);
1141
1142         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1143                cluuid->uuid, conn->cookie);
1144         RETURN(0);
1145 }
1146 EXPORT_SYMBOL(class_connect);
1147
1148 /* if export is involved in recovery then clean up related things */
1149 static void class_export_recovery_cleanup(struct obd_export *exp)
1150 {
1151         struct obd_device *obd = exp->exp_obd;
1152
1153         spin_lock(&obd->obd_recovery_task_lock);
1154         if (obd->obd_recovering) {
1155                 if (exp->exp_in_recovery) {
1156                         spin_lock(&exp->exp_lock);
1157                         exp->exp_in_recovery = 0;
1158                         spin_unlock(&exp->exp_lock);
1159                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1160                         atomic_dec(&obd->obd_connected_clients);
1161                 }
1162
1163                 /* if called during recovery then should update
1164                  * obd_stale_clients counter,
1165                  * lightweight exports are not counted */
1166                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1167                         exp->exp_obd->obd_stale_clients++;
1168         }
1169         spin_unlock(&obd->obd_recovery_task_lock);
1170
1171         spin_lock(&exp->exp_lock);
1172         /** Cleanup req replay fields */
1173         if (exp->exp_req_replay_needed) {
1174                 exp->exp_req_replay_needed = 0;
1175
1176                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1177                 atomic_dec(&obd->obd_req_replay_clients);
1178         }
1179
1180         /** Cleanup lock replay data */
1181         if (exp->exp_lock_replay_needed) {
1182                 exp->exp_lock_replay_needed = 0;
1183
1184                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1185                 atomic_dec(&obd->obd_lock_replay_clients);
1186         }
1187         spin_unlock(&exp->exp_lock);
1188 }
1189
1190 /* This function removes 1-3 references from the export:
1191  * 1 - for export pointer passed
1192  * and if disconnect really need
1193  * 2 - removing from hash
1194  * 3 - in client_unlink_export
1195  * The export pointer passed to this function can destroyed */
1196 int class_disconnect(struct obd_export *export)
1197 {
1198         int already_disconnected;
1199         ENTRY;
1200
1201         if (export == NULL) {
1202                 CWARN("attempting to free NULL export %p\n", export);
1203                 RETURN(-EINVAL);
1204         }
1205
1206         spin_lock(&export->exp_lock);
1207         already_disconnected = export->exp_disconnected;
1208         export->exp_disconnected = 1;
1209         spin_unlock(&export->exp_lock);
1210
1211         /* class_cleanup(), abort_recovery(), and class_fail_export()
1212          * all end up in here, and if any of them race we shouldn't
1213          * call extra class_export_puts(). */
1214         if (already_disconnected) {
1215                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1216                 GOTO(no_disconn, already_disconnected);
1217         }
1218
1219         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1220                export->exp_handle.h_cookie);
1221
1222         if (!hlist_unhashed(&export->exp_nid_hash))
1223                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1224                              &export->exp_connection->c_peer.nid,
1225                              &export->exp_nid_hash);
1226
1227         class_export_recovery_cleanup(export);
1228         class_unlink_export(export);
1229 no_disconn:
1230         class_export_put(export);
1231         RETURN(0);
1232 }
1233 EXPORT_SYMBOL(class_disconnect);
1234
1235 /* Return non-zero for a fully connected export */
1236 int class_connected_export(struct obd_export *exp)
1237 {
1238         int connected = 0;
1239
1240         if (exp) {
1241                 spin_lock(&exp->exp_lock);
1242                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1243                 spin_unlock(&exp->exp_lock);
1244         }
1245         return connected;
1246 }
1247 EXPORT_SYMBOL(class_connected_export);
1248
1249 static void class_disconnect_export_list(struct list_head *list,
1250                                          enum obd_option flags)
1251 {
1252         int rc;
1253         struct obd_export *exp;
1254         ENTRY;
1255
1256         /* It's possible that an export may disconnect itself, but
1257          * nothing else will be added to this list. */
1258         while (!list_empty(list)) {
1259                 exp = list_entry(list->next, struct obd_export,
1260                                  exp_obd_chain);
1261                 /* need for safe call CDEBUG after obd_disconnect */
1262                 class_export_get(exp);
1263
1264                 spin_lock(&exp->exp_lock);
1265                 exp->exp_flags = flags;
1266                 spin_unlock(&exp->exp_lock);
1267
1268                 if (obd_uuid_equals(&exp->exp_client_uuid,
1269                                     &exp->exp_obd->obd_uuid)) {
1270                         CDEBUG(D_HA,
1271                                "exp %p export uuid == obd uuid, don't discon\n",
1272                                exp);
1273                         /* Need to delete this now so we don't end up pointing
1274                          * to work_list later when this export is cleaned up. */
1275                         list_del_init(&exp->exp_obd_chain);
1276                         class_export_put(exp);
1277                         continue;
1278                 }
1279
1280                 class_export_get(exp);
1281                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1282                        "last request at "CFS_TIME_T"\n",
1283                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1284                        exp, exp->exp_last_request_time);
1285                 /* release one export reference anyway */
1286                 rc = obd_disconnect(exp);
1287
1288                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1289                        obd_export_nid2str(exp), exp, rc);
1290                 class_export_put(exp);
1291         }
1292         EXIT;
1293 }
1294
1295 void class_disconnect_exports(struct obd_device *obd)
1296 {
1297         struct list_head work_list;
1298         ENTRY;
1299
1300         /* Move all of the exports from obd_exports to a work list, en masse. */
1301         INIT_LIST_HEAD(&work_list);
1302         spin_lock(&obd->obd_dev_lock);
1303         list_splice_init(&obd->obd_exports, &work_list);
1304         list_splice_init(&obd->obd_delayed_exports, &work_list);
1305         spin_unlock(&obd->obd_dev_lock);
1306
1307         if (!list_empty(&work_list)) {
1308                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1309                        "disconnecting them\n", obd->obd_minor, obd);
1310                 class_disconnect_export_list(&work_list,
1311                                              exp_flags_from_obd(obd));
1312         } else
1313                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1314                        obd->obd_minor, obd);
1315         EXIT;
1316 }
1317 EXPORT_SYMBOL(class_disconnect_exports);
1318
1319 /* Remove exports that have not completed recovery.
1320  */
1321 void class_disconnect_stale_exports(struct obd_device *obd,
1322                                     int (*test_export)(struct obd_export *))
1323 {
1324         struct list_head work_list;
1325         struct obd_export *exp, *n;
1326         int evicted = 0;
1327         ENTRY;
1328
1329         INIT_LIST_HEAD(&work_list);
1330         spin_lock(&obd->obd_dev_lock);
1331         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1332                                  exp_obd_chain) {
1333                 /* don't count self-export as client */
1334                 if (obd_uuid_equals(&exp->exp_client_uuid,
1335                                     &exp->exp_obd->obd_uuid))
1336                         continue;
1337
1338                 /* don't evict clients which have no slot in last_rcvd
1339                  * (e.g. lightweight connection) */
1340                 if (exp->exp_target_data.ted_lr_idx == -1)
1341                         continue;
1342
1343                 spin_lock(&exp->exp_lock);
1344                 if (exp->exp_failed || test_export(exp)) {
1345                         spin_unlock(&exp->exp_lock);
1346                         continue;
1347                 }
1348                 exp->exp_failed = 1;
1349                 spin_unlock(&exp->exp_lock);
1350
1351                 list_move(&exp->exp_obd_chain, &work_list);
1352                 evicted++;
1353                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1354                        obd->obd_name, exp->exp_client_uuid.uuid,
1355                        exp->exp_connection == NULL ? "<unknown>" :
1356                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1357                 print_export_data(exp, "EVICTING", 0);
1358         }
1359         spin_unlock(&obd->obd_dev_lock);
1360
1361         if (evicted)
1362                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1363                               obd->obd_name, evicted);
1364
1365         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1366                                                  OBD_OPT_ABORT_RECOV);
1367         EXIT;
1368 }
1369 EXPORT_SYMBOL(class_disconnect_stale_exports);
1370
1371 void class_fail_export(struct obd_export *exp)
1372 {
1373         int rc, already_failed;
1374
1375         spin_lock(&exp->exp_lock);
1376         already_failed = exp->exp_failed;
1377         exp->exp_failed = 1;
1378         spin_unlock(&exp->exp_lock);
1379
1380         if (already_failed) {
1381                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1382                        exp, exp->exp_client_uuid.uuid);
1383                 return;
1384         }
1385
1386         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1387                exp, exp->exp_client_uuid.uuid);
1388
1389         if (obd_dump_on_timeout)
1390                 libcfs_debug_dumplog();
1391
1392         /* need for safe call CDEBUG after obd_disconnect */
1393         class_export_get(exp);
1394
1395         /* Most callers into obd_disconnect are removing their own reference
1396          * (request, for example) in addition to the one from the hash table.
1397          * We don't have such a reference here, so make one. */
1398         class_export_get(exp);
1399         rc = obd_disconnect(exp);
1400         if (rc)
1401                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1402         else
1403                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1404                        exp, exp->exp_client_uuid.uuid);
1405         class_export_put(exp);
1406 }
1407 EXPORT_SYMBOL(class_fail_export);
1408
1409 char *obd_export_nid2str(struct obd_export *exp)
1410 {
1411         if (exp->exp_connection != NULL)
1412                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1413
1414         return "(no nid)";
1415 }
1416 EXPORT_SYMBOL(obd_export_nid2str);
1417
1418 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1419 {
1420         cfs_hash_t *nid_hash;
1421         struct obd_export *doomed_exp = NULL;
1422         int exports_evicted = 0;
1423
1424         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1425
1426         spin_lock(&obd->obd_dev_lock);
1427         /* umount has run already, so evict thread should leave
1428          * its task to umount thread now */
1429         if (obd->obd_stopping) {
1430                 spin_unlock(&obd->obd_dev_lock);
1431                 return exports_evicted;
1432         }
1433         nid_hash = obd->obd_nid_hash;
1434         cfs_hash_getref(nid_hash);
1435         spin_unlock(&obd->obd_dev_lock);
1436
1437         do {
1438                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1439                 if (doomed_exp == NULL)
1440                         break;
1441
1442                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1443                          "nid %s found, wanted nid %s, requested nid %s\n",
1444                          obd_export_nid2str(doomed_exp),
1445                          libcfs_nid2str(nid_key), nid);
1446                 LASSERTF(doomed_exp != obd->obd_self_export,
1447                          "self-export is hashed by NID?\n");
1448                 exports_evicted++;
1449                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1450                               "request\n", obd->obd_name,
1451                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1452                               obd_export_nid2str(doomed_exp));
1453                 class_fail_export(doomed_exp);
1454                 class_export_put(doomed_exp);
1455         } while (1);
1456
1457         cfs_hash_putref(nid_hash);
1458
1459         if (!exports_evicted)
1460                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1461                        obd->obd_name, nid);
1462         return exports_evicted;
1463 }
1464 EXPORT_SYMBOL(obd_export_evict_by_nid);
1465
1466 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1467 {
1468         cfs_hash_t *uuid_hash;
1469         struct obd_export *doomed_exp = NULL;
1470         struct obd_uuid doomed_uuid;
1471         int exports_evicted = 0;
1472
1473         spin_lock(&obd->obd_dev_lock);
1474         if (obd->obd_stopping) {
1475                 spin_unlock(&obd->obd_dev_lock);
1476                 return exports_evicted;
1477         }
1478         uuid_hash = obd->obd_uuid_hash;
1479         cfs_hash_getref(uuid_hash);
1480         spin_unlock(&obd->obd_dev_lock);
1481
1482         obd_str2uuid(&doomed_uuid, uuid);
1483         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1484                 CERROR("%s: can't evict myself\n", obd->obd_name);
1485                 cfs_hash_putref(uuid_hash);
1486                 return exports_evicted;
1487         }
1488
1489         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1490
1491         if (doomed_exp == NULL) {
1492                 CERROR("%s: can't disconnect %s: no exports found\n",
1493                        obd->obd_name, uuid);
1494         } else {
1495                 CWARN("%s: evicting %s at adminstrative request\n",
1496                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1497                 class_fail_export(doomed_exp);
1498                 class_export_put(doomed_exp);
1499                 exports_evicted++;
1500         }
1501         cfs_hash_putref(uuid_hash);
1502
1503         return exports_evicted;
1504 }
1505
1506 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1507 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1508 #endif
1509
1510 static void print_export_data(struct obd_export *exp, const char *status,
1511                               int locks)
1512 {
1513         struct ptlrpc_reply_state *rs;
1514         struct ptlrpc_reply_state *first_reply = NULL;
1515         int nreplies = 0;
1516
1517         spin_lock(&exp->exp_lock);
1518         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1519                             rs_exp_list) {
1520                 if (nreplies == 0)
1521                         first_reply = rs;
1522                 nreplies++;
1523         }
1524         spin_unlock(&exp->exp_lock);
1525
1526         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1527                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1528                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1529                atomic_read(&exp->exp_rpc_count),
1530                atomic_read(&exp->exp_cb_count),
1531                atomic_read(&exp->exp_locks_count),
1532                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1533                nreplies, first_reply, nreplies > 3 ? "..." : "",
1534                exp->exp_last_committed);
1535 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1536         if (locks && class_export_dump_hook != NULL)
1537                 class_export_dump_hook(exp);
1538 #endif
1539 }
1540
1541 void dump_exports(struct obd_device *obd, int locks)
1542 {
1543         struct obd_export *exp;
1544
1545         spin_lock(&obd->obd_dev_lock);
1546         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1547                 print_export_data(exp, "ACTIVE", locks);
1548         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1549                 print_export_data(exp, "UNLINKED", locks);
1550         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1551                 print_export_data(exp, "DELAYED", locks);
1552         spin_unlock(&obd->obd_dev_lock);
1553         spin_lock(&obd_zombie_impexp_lock);
1554         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1555                 print_export_data(exp, "ZOMBIE", locks);
1556         spin_unlock(&obd_zombie_impexp_lock);
1557 }
1558
1559 void obd_exports_barrier(struct obd_device *obd)
1560 {
1561         int waited = 2;
1562         LASSERT(list_empty(&obd->obd_exports));
1563         spin_lock(&obd->obd_dev_lock);
1564         while (!list_empty(&obd->obd_unlinked_exports)) {
1565                 spin_unlock(&obd->obd_dev_lock);
1566                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1567                                                    cfs_time_seconds(waited));
1568                 if (waited > 5 && IS_PO2(waited)) {
1569                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1570                                       "more than %d seconds. "
1571                                       "The obd refcount = %d. Is it stuck?\n",
1572                                       obd->obd_name, waited,
1573                                       atomic_read(&obd->obd_refcount));
1574                         dump_exports(obd, 1);
1575                 }
1576                 waited *= 2;
1577                 spin_lock(&obd->obd_dev_lock);
1578         }
1579         spin_unlock(&obd->obd_dev_lock);
1580 }
1581 EXPORT_SYMBOL(obd_exports_barrier);
1582
1583 /* Total amount of zombies to be destroyed */
1584 static int zombies_count = 0;
1585
1586 /**
1587  * kill zombie imports and exports
1588  */
1589 void obd_zombie_impexp_cull(void)
1590 {
1591         struct obd_import *import;
1592         struct obd_export *export;
1593         ENTRY;
1594
1595         do {
1596                 spin_lock(&obd_zombie_impexp_lock);
1597
1598                 import = NULL;
1599                 if (!list_empty(&obd_zombie_imports)) {
1600                         import = list_entry(obd_zombie_imports.next,
1601                                             struct obd_import,
1602                                             imp_zombie_chain);
1603                         list_del_init(&import->imp_zombie_chain);
1604                 }
1605
1606                 export = NULL;
1607                 if (!list_empty(&obd_zombie_exports)) {
1608                         export = list_entry(obd_zombie_exports.next,
1609                                             struct obd_export,
1610                                             exp_obd_chain);
1611                         list_del_init(&export->exp_obd_chain);
1612                 }
1613
1614                 spin_unlock(&obd_zombie_impexp_lock);
1615
1616                 if (import != NULL) {
1617                         class_import_destroy(import);
1618                         spin_lock(&obd_zombie_impexp_lock);
1619                         zombies_count--;
1620                         spin_unlock(&obd_zombie_impexp_lock);
1621                 }
1622
1623                 if (export != NULL) {
1624                         class_export_destroy(export);
1625                         spin_lock(&obd_zombie_impexp_lock);
1626                         zombies_count--;
1627                         spin_unlock(&obd_zombie_impexp_lock);
1628                 }
1629
1630                 cond_resched();
1631         } while (import != NULL || export != NULL);
1632         EXIT;
1633 }
1634
1635 static struct completion        obd_zombie_start;
1636 static struct completion        obd_zombie_stop;
1637 static unsigned long            obd_zombie_flags;
1638 static wait_queue_head_t        obd_zombie_waitq;
1639 static pid_t                    obd_zombie_pid;
1640
1641 enum {
1642         OBD_ZOMBIE_STOP         = 0x0001,
1643 };
1644
1645 /**
1646  * check for work for kill zombie import/export thread.
1647  */
1648 static int obd_zombie_impexp_check(void *arg)
1649 {
1650         int rc;
1651
1652         spin_lock(&obd_zombie_impexp_lock);
1653         rc = (zombies_count == 0) &&
1654              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1655         spin_unlock(&obd_zombie_impexp_lock);
1656
1657         RETURN(rc);
1658 }
1659
1660 /**
1661  * Add export to the obd_zombe thread and notify it.
1662  */
1663 static void obd_zombie_export_add(struct obd_export *exp) {
1664         spin_lock(&exp->exp_obd->obd_dev_lock);
1665         LASSERT(!list_empty(&exp->exp_obd_chain));
1666         list_del_init(&exp->exp_obd_chain);
1667         spin_unlock(&exp->exp_obd->obd_dev_lock);
1668         spin_lock(&obd_zombie_impexp_lock);
1669         zombies_count++;
1670         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1671         spin_unlock(&obd_zombie_impexp_lock);
1672
1673         obd_zombie_impexp_notify();
1674 }
1675
1676 /**
1677  * Add import to the obd_zombe thread and notify it.
1678  */
1679 static void obd_zombie_import_add(struct obd_import *imp) {
1680         LASSERT(imp->imp_sec == NULL);
1681         LASSERT(imp->imp_rq_pool == NULL);
1682         spin_lock(&obd_zombie_impexp_lock);
1683         LASSERT(list_empty(&imp->imp_zombie_chain));
1684         zombies_count++;
1685         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1686         spin_unlock(&obd_zombie_impexp_lock);
1687
1688         obd_zombie_impexp_notify();
1689 }
1690
1691 /**
1692  * notify import/export destroy thread about new zombie.
1693  */
1694 static void obd_zombie_impexp_notify(void)
1695 {
1696         /*
1697          * Make sure obd_zomebie_impexp_thread get this notification.
1698          * It is possible this signal only get by obd_zombie_barrier, and
1699          * barrier gulps this notification and sleeps away and hangs ensues
1700          */
1701         wake_up_all(&obd_zombie_waitq);
1702 }
1703
1704 /**
1705  * check whether obd_zombie is idle
1706  */
1707 static int obd_zombie_is_idle(void)
1708 {
1709         int rc;
1710
1711         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1712         spin_lock(&obd_zombie_impexp_lock);
1713         rc = (zombies_count == 0);
1714         spin_unlock(&obd_zombie_impexp_lock);
1715         return rc;
1716 }
1717
1718 /**
1719  * wait when obd_zombie import/export queues become empty
1720  */
1721 void obd_zombie_barrier(void)
1722 {
1723         struct l_wait_info lwi = { 0 };
1724
1725         if (obd_zombie_pid == current_pid())
1726                 /* don't wait for myself */
1727                 return;
1728         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1729 }
1730 EXPORT_SYMBOL(obd_zombie_barrier);
1731
1732
1733 /**
1734  * destroy zombie export/import thread.
1735  */
1736 static int obd_zombie_impexp_thread(void *unused)
1737 {
1738         unshare_fs_struct();
1739         complete(&obd_zombie_start);
1740
1741         obd_zombie_pid = current_pid();
1742
1743         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1744                 struct l_wait_info lwi = { 0 };
1745
1746                 l_wait_event(obd_zombie_waitq,
1747                              !obd_zombie_impexp_check(NULL), &lwi);
1748                 obd_zombie_impexp_cull();
1749
1750                 /*
1751                  * Notify obd_zombie_barrier callers that queues
1752                  * may be empty.
1753                  */
1754                 wake_up(&obd_zombie_waitq);
1755         }
1756
1757         complete(&obd_zombie_stop);
1758
1759         RETURN(0);
1760 }
1761
1762
1763 /**
1764  * start destroy zombie import/export thread
1765  */
1766 int obd_zombie_impexp_init(void)
1767 {
1768         struct task_struct *task;
1769
1770         INIT_LIST_HEAD(&obd_zombie_imports);
1771
1772         INIT_LIST_HEAD(&obd_zombie_exports);
1773         spin_lock_init(&obd_zombie_impexp_lock);
1774         init_completion(&obd_zombie_start);
1775         init_completion(&obd_zombie_stop);
1776         init_waitqueue_head(&obd_zombie_waitq);
1777         obd_zombie_pid = 0;
1778
1779         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1780         if (IS_ERR(task))
1781                 RETURN(PTR_ERR(task));
1782
1783         wait_for_completion(&obd_zombie_start);
1784         RETURN(0);
1785 }
1786 /**
1787  * stop destroy zombie import/export thread
1788  */
1789 void obd_zombie_impexp_stop(void)
1790 {
1791         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1792         obd_zombie_impexp_notify();
1793         wait_for_completion(&obd_zombie_stop);
1794 }
1795
1796 /***** Kernel-userspace comm helpers *******/
1797
1798 /* Get length of entire message, including header */
1799 int kuc_len(int payload_len)
1800 {
1801         return sizeof(struct kuc_hdr) + payload_len;
1802 }
1803 EXPORT_SYMBOL(kuc_len);
1804
1805 /* Get a pointer to kuc header, given a ptr to the payload
1806  * @param p Pointer to payload area
1807  * @returns Pointer to kuc header
1808  */
1809 struct kuc_hdr * kuc_ptr(void *p)
1810 {
1811         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1812         LASSERT(lh->kuc_magic == KUC_MAGIC);
1813         return lh;
1814 }
1815 EXPORT_SYMBOL(kuc_ptr);
1816
1817 /* Test if payload is part of kuc message
1818  * @param p Pointer to payload area
1819  * @returns boolean
1820  */
1821 int kuc_ispayload(void *p)
1822 {
1823         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1824
1825         if (kh->kuc_magic == KUC_MAGIC)
1826                 return 1;
1827         else
1828                 return 0;
1829 }
1830 EXPORT_SYMBOL(kuc_ispayload);
1831
1832 /* Alloc space for a message, and fill in header
1833  * @return Pointer to payload area
1834  */
1835 void *kuc_alloc(int payload_len, int transport, int type)
1836 {
1837         struct kuc_hdr *lh;
1838         int len = kuc_len(payload_len);
1839
1840         OBD_ALLOC(lh, len);
1841         if (lh == NULL)
1842                 return ERR_PTR(-ENOMEM);
1843
1844         lh->kuc_magic = KUC_MAGIC;
1845         lh->kuc_transport = transport;
1846         lh->kuc_msgtype = type;
1847         lh->kuc_msglen = len;
1848
1849         return (void *)(lh + 1);
1850 }
1851 EXPORT_SYMBOL(kuc_alloc);
1852
1853 /* Takes pointer to payload area */
1854 inline void kuc_free(void *p, int payload_len)
1855 {
1856         struct kuc_hdr *lh = kuc_ptr(p);
1857         OBD_FREE(lh, kuc_len(payload_len));
1858 }
1859 EXPORT_SYMBOL(kuc_free);
1860
1861 struct obd_request_slot_waiter {
1862         struct list_head        orsw_entry;
1863         wait_queue_head_t       orsw_waitq;
1864         bool                    orsw_signaled;
1865 };
1866
1867 static bool obd_request_slot_avail(struct client_obd *cli,
1868                                    struct obd_request_slot_waiter *orsw)
1869 {
1870         bool avail;
1871
1872         spin_lock(&cli->cl_loi_list_lock);
1873         avail = !!list_empty(&orsw->orsw_entry);
1874         spin_unlock(&cli->cl_loi_list_lock);
1875
1876         return avail;
1877 };
1878
1879 /*
1880  * For network flow control, the RPC sponsor needs to acquire a credit
1881  * before sending the RPC. The credits count for a connection is defined
1882  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1883  * the subsequent RPC sponsors need to wait until others released their
1884  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1885  */
1886 int obd_get_request_slot(struct client_obd *cli)
1887 {
1888         struct obd_request_slot_waiter   orsw;
1889         struct l_wait_info               lwi;
1890         int                              rc;
1891
1892         spin_lock(&cli->cl_loi_list_lock);
1893         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1894                 cli->cl_r_in_flight++;
1895                 spin_unlock(&cli->cl_loi_list_lock);
1896                 return 0;
1897         }
1898
1899         init_waitqueue_head(&orsw.orsw_waitq);
1900         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1901         orsw.orsw_signaled = false;
1902         spin_unlock(&cli->cl_loi_list_lock);
1903
1904         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1905         rc = l_wait_event(orsw.orsw_waitq,
1906                           obd_request_slot_avail(cli, &orsw) ||
1907                           orsw.orsw_signaled,
1908                           &lwi);
1909
1910         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1911          * freed but other (such as obd_put_request_slot) is using it. */
1912         spin_lock(&cli->cl_loi_list_lock);
1913         if (rc != 0) {
1914                 if (!orsw.orsw_signaled) {
1915                         if (list_empty(&orsw.orsw_entry))
1916                                 cli->cl_r_in_flight--;
1917                         else
1918                                 list_del(&orsw.orsw_entry);
1919                 }
1920         }
1921
1922         if (orsw.orsw_signaled) {
1923                 LASSERT(list_empty(&orsw.orsw_entry));
1924
1925                 rc = -EINTR;
1926         }
1927         spin_unlock(&cli->cl_loi_list_lock);
1928
1929         return rc;
1930 }
1931 EXPORT_SYMBOL(obd_get_request_slot);
1932
1933 void obd_put_request_slot(struct client_obd *cli)
1934 {
1935         struct obd_request_slot_waiter *orsw;
1936
1937         spin_lock(&cli->cl_loi_list_lock);
1938         cli->cl_r_in_flight--;
1939
1940         /* If there is free slot, wakeup the first waiter. */
1941         if (!list_empty(&cli->cl_loi_read_list) &&
1942             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1943                 orsw = list_entry(cli->cl_loi_read_list.next,
1944                                   struct obd_request_slot_waiter, orsw_entry);
1945                 list_del_init(&orsw->orsw_entry);
1946                 cli->cl_r_in_flight++;
1947                 wake_up(&orsw->orsw_waitq);
1948         }
1949         spin_unlock(&cli->cl_loi_list_lock);
1950 }
1951 EXPORT_SYMBOL(obd_put_request_slot);
1952
1953 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1954 {
1955         return cli->cl_max_rpcs_in_flight;
1956 }
1957 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1958
1959 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1960 {
1961         struct obd_request_slot_waiter *orsw;
1962         __u32                           old;
1963         int                             diff;
1964         int                             i;
1965
1966         if (max > OBD_MAX_RIF_MAX || max < 1)
1967                 return -ERANGE;
1968
1969         spin_lock(&cli->cl_loi_list_lock);
1970         old = cli->cl_max_rpcs_in_flight;
1971         cli->cl_max_rpcs_in_flight = max;
1972         diff = max - old;
1973
1974         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1975         for (i = 0; i < diff; i++) {
1976                 if (list_empty(&cli->cl_loi_read_list))
1977                         break;
1978
1979                 orsw = list_entry(cli->cl_loi_read_list.next,
1980                                   struct obd_request_slot_waiter, orsw_entry);
1981                 list_del_init(&orsw->orsw_entry);
1982                 cli->cl_r_in_flight++;
1983                 wake_up(&orsw->orsw_waitq);
1984         }
1985         spin_unlock(&cli->cl_loi_list_lock);
1986
1987         return 0;
1988 }
1989 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);