Whamcloud - gitweb
LU-5275 lprocfs: remove last of non seq data structs and functions.
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
45
46 spinlock_t obd_types_lock;
47
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 struct kmem_cache *import_cachep;
52
53 struct list_head obd_zombie_imports;
54 struct list_head obd_zombie_exports;
55 spinlock_t  obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks);
61
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
64
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
69 static struct obd_device *obd_device_alloc(void)
70 {
71         struct obd_device *obd;
72
73         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74         if (obd != NULL) {
75                 obd->obd_magic = OBD_DEVICE_MAGIC;
76         }
77         return obd;
78 }
79
80 static void obd_device_free(struct obd_device *obd)
81 {
82         LASSERT(obd != NULL);
83         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85         if (obd->obd_namespace != NULL) {
86                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87                        obd, obd->obd_namespace, obd->obd_force);
88                 LBUG();
89         }
90         lu_ref_fini(&obd->obd_reference);
91         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93
94 struct obd_type *class_search_type(const char *name)
95 {
96         struct list_head *tmp;
97         struct obd_type *type;
98
99         spin_lock(&obd_types_lock);
100         list_for_each(tmp, &obd_types) {
101                 type = list_entry(tmp, struct obd_type, typ_chain);
102                 if (strcmp(type->typ_name, name) == 0) {
103                         spin_unlock(&obd_types_lock);
104                         return type;
105                 }
106         }
107         spin_unlock(&obd_types_lock);
108         return NULL;
109 }
110 EXPORT_SYMBOL(class_search_type);
111
112 struct obd_type *class_get_type(const char *name)
113 {
114         struct obd_type *type = class_search_type(name);
115
116 #ifdef HAVE_MODULE_LOADING_SUPPORT
117         if (!type) {
118                 const char *modname = name;
119
120                 if (strcmp(modname, "obdfilter") == 0)
121                         modname = "ofd";
122
123                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124                         modname = LUSTRE_OSP_NAME;
125
126                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127                         modname = LUSTRE_MDT_NAME;
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143         }
144         return type;
145 }
146 EXPORT_SYMBOL(class_get_type);
147
148 void class_put_type(struct obd_type *type)
149 {
150         LASSERT(type);
151         spin_lock(&type->obd_type_lock);
152         type->typ_refcnt--;
153         module_put(type->typ_dt_ops->o_owner);
154         spin_unlock(&type->obd_type_lock);
155 }
156 EXPORT_SYMBOL(class_put_type);
157
158 #define CLASS_MAX_NAME 1024
159
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161                         bool enable_proc, struct lprocfs_vars *vars,
162                         const char *name, struct lu_device_type *ldt)
163 {
164         struct obd_type *type;
165         int rc = 0;
166         ENTRY;
167
168         /* sanity check */
169         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170
171         if (class_search_type(name)) {
172                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173                 RETURN(-EEXIST);
174         }
175
176         rc = -ENOMEM;
177         OBD_ALLOC(type, sizeof(*type));
178         if (type == NULL)
179                 RETURN(rc);
180
181         OBD_ALLOC_PTR(type->typ_dt_ops);
182         OBD_ALLOC_PTR(type->typ_md_ops);
183         OBD_ALLOC(type->typ_name, strlen(name) + 1);
184
185         if (type->typ_dt_ops == NULL ||
186             type->typ_md_ops == NULL ||
187             type->typ_name == NULL)
188                 GOTO (failed, rc);
189
190         *(type->typ_dt_ops) = *dt_ops;
191         /* md_ops is optional */
192         if (md_ops)
193                 *(type->typ_md_ops) = *md_ops;
194         strcpy(type->typ_name, name);
195         spin_lock_init(&type->obd_type_lock);
196
197 #ifdef LPROCFS
198         if (enable_proc) {
199                 type->typ_procroot = lprocfs_register(type->typ_name,
200                                                       proc_lustre_root,
201                                                       vars, type);
202                 if (IS_ERR(type->typ_procroot)) {
203                         rc = PTR_ERR(type->typ_procroot);
204                         type->typ_procroot = NULL;
205                         GOTO(failed, rc);
206                 }
207         }
208 #endif
209         if (ldt != NULL) {
210                 type->typ_lu = ldt;
211                 rc = lu_device_type_init(ldt);
212                 if (rc != 0)
213                         GOTO (failed, rc);
214         }
215
216         spin_lock(&obd_types_lock);
217         list_add(&type->typ_chain, &obd_types);
218         spin_unlock(&obd_types_lock);
219
220         RETURN (0);
221
222 failed:
223         if (type->typ_name != NULL) {
224 #ifdef LPROCFS
225                 if (type->typ_procroot != NULL)
226                         remove_proc_subtree(type->typ_name, proc_lustre_root);
227 #endif
228                 OBD_FREE(type->typ_name, strlen(name) + 1);
229         }
230         if (type->typ_md_ops != NULL)
231                 OBD_FREE_PTR(type->typ_md_ops);
232         if (type->typ_dt_ops != NULL)
233                 OBD_FREE_PTR(type->typ_dt_ops);
234         OBD_FREE(type, sizeof(*type));
235         RETURN(rc);
236 }
237 EXPORT_SYMBOL(class_register_type);
238
239 int class_unregister_type(const char *name)
240 {
241         struct obd_type *type = class_search_type(name);
242         ENTRY;
243
244         if (!type) {
245                 CERROR("unknown obd type\n");
246                 RETURN(-EINVAL);
247         }
248
249         if (type->typ_refcnt) {
250                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
251                 /* This is a bad situation, let's make the best of it */
252                 /* Remove ops, but leave the name for debugging */
253                 OBD_FREE_PTR(type->typ_dt_ops);
254                 OBD_FREE_PTR(type->typ_md_ops);
255                 RETURN(-EBUSY);
256         }
257
258         /* we do not use type->typ_procroot as for compatibility purposes
259          * other modules can share names (i.e. lod can use lov entry). so
260          * we can't reference pointer as it can get invalided when another
261          * module removes the entry */
262 #ifdef LPROCFS
263         if (type->typ_procroot != NULL)
264                 remove_proc_subtree(type->typ_name, proc_lustre_root);
265         if (type->typ_procsym != NULL)
266                 lprocfs_remove(&type->typ_procsym);
267 #endif
268         if (type->typ_lu)
269                 lu_device_type_fini(type->typ_lu);
270
271         spin_lock(&obd_types_lock);
272         list_del(&type->typ_chain);
273         spin_unlock(&obd_types_lock);
274         OBD_FREE(type->typ_name, strlen(name) + 1);
275         if (type->typ_dt_ops != NULL)
276                 OBD_FREE_PTR(type->typ_dt_ops);
277         if (type->typ_md_ops != NULL)
278                 OBD_FREE_PTR(type->typ_md_ops);
279         OBD_FREE(type, sizeof(*type));
280         RETURN(0);
281 } /* class_unregister_type */
282 EXPORT_SYMBOL(class_unregister_type);
283
284 /**
285  * Create a new obd device.
286  *
287  * Find an empty slot in ::obd_devs[], create a new obd device in it.
288  *
289  * \param[in] type_name obd device type string.
290  * \param[in] name      obd device name.
291  *
292  * \retval NULL if create fails, otherwise return the obd device
293  *         pointer created.
294  */
295 struct obd_device *class_newdev(const char *type_name, const char *name)
296 {
297         struct obd_device *result = NULL;
298         struct obd_device *newdev;
299         struct obd_type *type = NULL;
300         int i;
301         int new_obd_minor = 0;
302         ENTRY;
303
304         if (strlen(name) >= MAX_OBD_NAME) {
305                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
306                 RETURN(ERR_PTR(-EINVAL));
307         }
308
309         type = class_get_type(type_name);
310         if (type == NULL){
311                 CERROR("OBD: unknown type: %s\n", type_name);
312                 RETURN(ERR_PTR(-ENODEV));
313         }
314
315         newdev = obd_device_alloc();
316         if (newdev == NULL)
317                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
318
319         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
320
321         write_lock(&obd_dev_lock);
322         for (i = 0; i < class_devno_max(); i++) {
323                 struct obd_device *obd = class_num2obd(i);
324
325                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
326                         CERROR("Device %s already exists at %d, won't add\n",
327                                name, i);
328                         if (result) {
329                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
330                                          "%p obd_magic %08x != %08x\n", result,
331                                          result->obd_magic, OBD_DEVICE_MAGIC);
332                                 LASSERTF(result->obd_minor == new_obd_minor,
333                                          "%p obd_minor %d != %d\n", result,
334                                          result->obd_minor, new_obd_minor);
335
336                                 obd_devs[result->obd_minor] = NULL;
337                                 result->obd_name[0]='\0';
338                          }
339                         result = ERR_PTR(-EEXIST);
340                         break;
341                 }
342                 if (!result && !obd) {
343                         result = newdev;
344                         result->obd_minor = i;
345                         new_obd_minor = i;
346                         result->obd_type = type;
347                         strncpy(result->obd_name, name,
348                                 sizeof(result->obd_name) - 1);
349                         obd_devs[i] = result;
350                 }
351         }
352         write_unlock(&obd_dev_lock);
353
354         if (result == NULL && i >= class_devno_max()) {
355                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
356                        class_devno_max());
357                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
358         }
359
360         if (IS_ERR(result))
361                 GOTO(out, result);
362
363         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
364                result->obd_name, result);
365
366         RETURN(result);
367 out:
368         obd_device_free(newdev);
369 out_type:
370         class_put_type(type);
371         return result;
372 }
373
374 void class_release_dev(struct obd_device *obd)
375 {
376         struct obd_type *obd_type = obd->obd_type;
377
378         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
379                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
380         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
381                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
382         LASSERT(obd_type != NULL);
383
384         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
385                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
386
387         write_lock(&obd_dev_lock);
388         obd_devs[obd->obd_minor] = NULL;
389         write_unlock(&obd_dev_lock);
390         obd_device_free(obd);
391
392         class_put_type(obd_type);
393 }
394
395 int class_name2dev(const char *name)
396 {
397         int i;
398
399         if (!name)
400                 return -1;
401
402         read_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405
406                 if (obd && strcmp(name, obd->obd_name) == 0) {
407                         /* Make sure we finished attaching before we give
408                            out any references */
409                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410                         if (obd->obd_attached) {
411                                 read_unlock(&obd_dev_lock);
412                                 return i;
413                         }
414                         break;
415                 }
416         }
417         read_unlock(&obd_dev_lock);
418
419         return -1;
420 }
421 EXPORT_SYMBOL(class_name2dev);
422
423 struct obd_device *class_name2obd(const char *name)
424 {
425         int dev = class_name2dev(name);
426
427         if (dev < 0 || dev > class_devno_max())
428                 return NULL;
429         return class_num2obd(dev);
430 }
431 EXPORT_SYMBOL(class_name2obd);
432
433 int class_uuid2dev(struct obd_uuid *uuid)
434 {
435         int i;
436
437         read_lock(&obd_dev_lock);
438         for (i = 0; i < class_devno_max(); i++) {
439                 struct obd_device *obd = class_num2obd(i);
440
441                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
442                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
443                         read_unlock(&obd_dev_lock);
444                         return i;
445                 }
446         }
447         read_unlock(&obd_dev_lock);
448
449         return -1;
450 }
451 EXPORT_SYMBOL(class_uuid2dev);
452
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
454 {
455         int dev = class_uuid2dev(uuid);
456         if (dev < 0)
457                 return NULL;
458         return class_num2obd(dev);
459 }
460 EXPORT_SYMBOL(class_uuid2obd);
461
462 /**
463  * Get obd device from ::obd_devs[]
464  *
465  * \param num [in] array index
466  *
467  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468  *         otherwise return the obd device there.
469  */
470 struct obd_device *class_num2obd(int num)
471 {
472         struct obd_device *obd = NULL;
473
474         if (num < class_devno_max()) {
475                 obd = obd_devs[num];
476                 if (obd == NULL)
477                         return NULL;
478
479                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480                          "%p obd_magic %08x != %08x\n",
481                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482                 LASSERTF(obd->obd_minor == num,
483                          "%p obd_minor %0d != %0d\n",
484                          obd, obd->obd_minor, num);
485         }
486
487         return obd;
488 }
489 EXPORT_SYMBOL(class_num2obd);
490
491 /**
492  * Get obd devices count. Device in any
493  *    state are counted
494  * \retval obd device count
495  */
496 int get_devices_count(void)
497 {
498         int index, max_index = class_devno_max(), dev_count = 0;
499
500         read_lock(&obd_dev_lock);
501         for (index = 0; index <= max_index; index++) {
502                 struct obd_device *obd = class_num2obd(index);
503                 if (obd != NULL)
504                         dev_count++;
505         }
506         read_unlock(&obd_dev_lock);
507
508         return dev_count;
509 }
510 EXPORT_SYMBOL(get_devices_count);
511
512 void class_obd_list(void)
513 {
514         char *status;
515         int i;
516
517         read_lock(&obd_dev_lock);
518         for (i = 0; i < class_devno_max(); i++) {
519                 struct obd_device *obd = class_num2obd(i);
520
521                 if (obd == NULL)
522                         continue;
523                 if (obd->obd_stopping)
524                         status = "ST";
525                 else if (obd->obd_set_up)
526                         status = "UP";
527                 else if (obd->obd_attached)
528                         status = "AT";
529                 else
530                         status = "--";
531                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
532                          i, status, obd->obd_type->typ_name,
533                          obd->obd_name, obd->obd_uuid.uuid,
534                          atomic_read(&obd->obd_refcount));
535         }
536         read_unlock(&obd_dev_lock);
537         return;
538 }
539
540 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
541    specified, then only the client with that uuid is returned,
542    otherwise any client connected to the tgt is returned. */
543 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
544                                           const char * typ_name,
545                                           struct obd_uuid *grp_uuid)
546 {
547         int i;
548
549         read_lock(&obd_dev_lock);
550         for (i = 0; i < class_devno_max(); i++) {
551                 struct obd_device *obd = class_num2obd(i);
552
553                 if (obd == NULL)
554                         continue;
555                 if ((strncmp(obd->obd_type->typ_name, typ_name,
556                              strlen(typ_name)) == 0)) {
557                         if (obd_uuid_equals(tgt_uuid,
558                                             &obd->u.cli.cl_target_uuid) &&
559                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
560                                                          &obd->obd_uuid) : 1)) {
561                                 read_unlock(&obd_dev_lock);
562                                 return obd;
563                         }
564                 }
565         }
566         read_unlock(&obd_dev_lock);
567
568         return NULL;
569 }
570 EXPORT_SYMBOL(class_find_client_obd);
571
572 /* Iterate the obd_device list looking devices have grp_uuid. Start
573    searching at *next, and if a device is found, the next index to look
574    at is saved in *next. If next is NULL, then the first matching device
575    will always be returned. */
576 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
577 {
578         int i;
579
580         if (next == NULL)
581                 i = 0;
582         else if (*next >= 0 && *next < class_devno_max())
583                 i = *next;
584         else
585                 return NULL;
586
587         read_lock(&obd_dev_lock);
588         for (; i < class_devno_max(); i++) {
589                 struct obd_device *obd = class_num2obd(i);
590
591                 if (obd == NULL)
592                         continue;
593                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
594                         if (next != NULL)
595                                 *next = i+1;
596                         read_unlock(&obd_dev_lock);
597                         return obd;
598                 }
599         }
600         read_unlock(&obd_dev_lock);
601
602         return NULL;
603 }
604 EXPORT_SYMBOL(class_devices_in_group);
605
606 /**
607  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
608  * adjust sptlrpc settings accordingly.
609  */
610 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
611 {
612         struct obd_device  *obd;
613         const char         *type;
614         int                 i, rc = 0, rc2;
615
616         LASSERT(namelen > 0);
617
618         read_lock(&obd_dev_lock);
619         for (i = 0; i < class_devno_max(); i++) {
620                 obd = class_num2obd(i);
621
622                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
623                         continue;
624
625                 /* only notify mdc, osc, mdt, ost */
626                 type = obd->obd_type->typ_name;
627                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
628                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
629                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
630                     strcmp(type, LUSTRE_OST_NAME) != 0)
631                         continue;
632
633                 if (strncmp(obd->obd_name, fsname, namelen))
634                         continue;
635
636                 class_incref(obd, __FUNCTION__, obd);
637                 read_unlock(&obd_dev_lock);
638                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
639                                          sizeof(KEY_SPTLRPC_CONF),
640                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
641                 rc = rc ? rc : rc2;
642                 class_decref(obd, __FUNCTION__, obd);
643                 read_lock(&obd_dev_lock);
644         }
645         read_unlock(&obd_dev_lock);
646         return rc;
647 }
648 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
649
650 void obd_cleanup_caches(void)
651 {
652         ENTRY;
653         if (obd_device_cachep) {
654                 kmem_cache_destroy(obd_device_cachep);
655                 obd_device_cachep = NULL;
656         }
657         if (obdo_cachep) {
658                 kmem_cache_destroy(obdo_cachep);
659                 obdo_cachep = NULL;
660         }
661         if (import_cachep) {
662                 kmem_cache_destroy(import_cachep);
663                 import_cachep = NULL;
664         }
665         if (capa_cachep) {
666                 kmem_cache_destroy(capa_cachep);
667                 capa_cachep = NULL;
668         }
669         EXIT;
670 }
671
672 int obd_init_caches(void)
673 {
674         int rc;
675         ENTRY;
676
677         LASSERT(obd_device_cachep == NULL);
678         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
679                                               sizeof(struct obd_device),
680                                               0, 0, NULL);
681         if (!obd_device_cachep)
682                 GOTO(out, rc = -ENOMEM);
683
684         LASSERT(obdo_cachep == NULL);
685         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
686                                         0, 0, NULL);
687         if (!obdo_cachep)
688                 GOTO(out, rc = -ENOMEM);
689
690         LASSERT(import_cachep == NULL);
691         import_cachep = kmem_cache_create("ll_import_cache",
692                                           sizeof(struct obd_import),
693                                           0, 0, NULL);
694         if (!import_cachep)
695                 GOTO(out, rc = -ENOMEM);
696
697         LASSERT(capa_cachep == NULL);
698         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
699                                         0, 0, NULL);
700         if (!capa_cachep)
701                 GOTO(out, rc = -ENOMEM);
702
703         RETURN(0);
704 out:
705         obd_cleanup_caches();
706         RETURN(rc);
707 }
708
709 /* map connection to client */
710 struct obd_export *class_conn2export(struct lustre_handle *conn)
711 {
712         struct obd_export *export;
713         ENTRY;
714
715         if (!conn) {
716                 CDEBUG(D_CACHE, "looking for null handle\n");
717                 RETURN(NULL);
718         }
719
720         if (conn->cookie == -1) {  /* this means assign a new connection */
721                 CDEBUG(D_CACHE, "want a new connection\n");
722                 RETURN(NULL);
723         }
724
725         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
726         export = class_handle2object(conn->cookie, NULL);
727         RETURN(export);
728 }
729 EXPORT_SYMBOL(class_conn2export);
730
731 struct obd_device *class_exp2obd(struct obd_export *exp)
732 {
733         if (exp)
734                 return exp->exp_obd;
735         return NULL;
736 }
737 EXPORT_SYMBOL(class_exp2obd);
738
739 struct obd_device *class_conn2obd(struct lustre_handle *conn)
740 {
741         struct obd_export *export;
742         export = class_conn2export(conn);
743         if (export) {
744                 struct obd_device *obd = export->exp_obd;
745                 class_export_put(export);
746                 return obd;
747         }
748         return NULL;
749 }
750 EXPORT_SYMBOL(class_conn2obd);
751
752 struct obd_import *class_exp2cliimp(struct obd_export *exp)
753 {
754         struct obd_device *obd = exp->exp_obd;
755         if (obd == NULL)
756                 return NULL;
757         return obd->u.cli.cl_import;
758 }
759 EXPORT_SYMBOL(class_exp2cliimp);
760
761 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
762 {
763         struct obd_device *obd = class_conn2obd(conn);
764         if (obd == NULL)
765                 return NULL;
766         return obd->u.cli.cl_import;
767 }
768 EXPORT_SYMBOL(class_conn2cliimp);
769
770 /* Export management functions */
771 static void class_export_destroy(struct obd_export *exp)
772 {
773         struct obd_device *obd = exp->exp_obd;
774         ENTRY;
775
776         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
777         LASSERT(obd != NULL);
778
779         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
780                exp->exp_client_uuid.uuid, obd->obd_name);
781
782         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783         if (exp->exp_connection)
784                 ptlrpc_put_connection_superhack(exp->exp_connection);
785
786         LASSERT(list_empty(&exp->exp_outstanding_replies));
787         LASSERT(list_empty(&exp->exp_uncommitted_replies));
788         LASSERT(list_empty(&exp->exp_req_replay_queue));
789         LASSERT(list_empty(&exp->exp_hp_rpcs));
790         obd_destroy_export(exp);
791         class_decref(obd, "export", exp);
792
793         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
794         EXIT;
795 }
796
797 static void export_handle_addref(void *export)
798 {
799         class_export_get(export);
800 }
801
802 static struct portals_handle_ops export_handle_ops = {
803         .hop_addref = export_handle_addref,
804         .hop_free   = NULL,
805 };
806
807 struct obd_export *class_export_get(struct obd_export *exp)
808 {
809         atomic_inc(&exp->exp_refcount);
810         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
811                atomic_read(&exp->exp_refcount));
812         return exp;
813 }
814 EXPORT_SYMBOL(class_export_get);
815
816 void class_export_put(struct obd_export *exp)
817 {
818         LASSERT(exp != NULL);
819         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
820         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
821                atomic_read(&exp->exp_refcount) - 1);
822
823         if (atomic_dec_and_test(&exp->exp_refcount)) {
824                 LASSERT(!list_empty(&exp->exp_obd_chain));
825                 CDEBUG(D_IOCTL, "final put %p/%s\n",
826                        exp, exp->exp_client_uuid.uuid);
827
828                 /* release nid stat refererence */
829                 lprocfs_exp_cleanup(exp);
830
831                 obd_zombie_export_add(exp);
832         }
833 }
834 EXPORT_SYMBOL(class_export_put);
835
836 /* Creates a new export, adds it to the hash table, and returns a
837  * pointer to it. The refcount is 2: one for the hash reference, and
838  * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840                                     struct obd_uuid *cluuid)
841 {
842         struct obd_export *export;
843         cfs_hash_t *hash = NULL;
844         int rc = 0;
845         ENTRY;
846
847         OBD_ALLOC_PTR(export);
848         if (!export)
849                 return ERR_PTR(-ENOMEM);
850
851         export->exp_conn_cnt = 0;
852         export->exp_lock_hash = NULL;
853         export->exp_flock_hash = NULL;
854         atomic_set(&export->exp_refcount, 2);
855         atomic_set(&export->exp_rpc_count, 0);
856         atomic_set(&export->exp_cb_count, 0);
857         atomic_set(&export->exp_locks_count, 0);
858 #if LUSTRE_TRACKS_LOCK_EXP_REFS
859         INIT_LIST_HEAD(&export->exp_locks_list);
860         spin_lock_init(&export->exp_locks_list_guard);
861 #endif
862         atomic_set(&export->exp_replay_count, 0);
863         export->exp_obd = obd;
864         INIT_LIST_HEAD(&export->exp_outstanding_replies);
865         spin_lock_init(&export->exp_uncommitted_replies_lock);
866         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
867         INIT_LIST_HEAD(&export->exp_req_replay_queue);
868         INIT_LIST_HEAD(&export->exp_handle.h_link);
869         INIT_LIST_HEAD(&export->exp_hp_rpcs);
870         INIT_LIST_HEAD(&export->exp_reg_rpcs);
871         class_handle_hash(&export->exp_handle, &export_handle_ops);
872         export->exp_last_request_time = cfs_time_current_sec();
873         spin_lock_init(&export->exp_lock);
874         spin_lock_init(&export->exp_rpc_lock);
875         INIT_HLIST_NODE(&export->exp_uuid_hash);
876         INIT_HLIST_NODE(&export->exp_nid_hash);
877         spin_lock_init(&export->exp_bl_list_lock);
878         INIT_LIST_HEAD(&export->exp_bl_list);
879
880         export->exp_sp_peer = LUSTRE_SP_ANY;
881         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
882         export->exp_client_uuid = *cluuid;
883         obd_init_export(export);
884
885         spin_lock(&obd->obd_dev_lock);
886         /* shouldn't happen, but might race */
887         if (obd->obd_stopping)
888                 GOTO(exit_unlock, rc = -ENODEV);
889
890         hash = cfs_hash_getref(obd->obd_uuid_hash);
891         if (hash == NULL)
892                 GOTO(exit_unlock, rc = -ENODEV);
893         spin_unlock(&obd->obd_dev_lock);
894
895         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
896                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
897                 if (rc != 0) {
898                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
899                                       obd->obd_name, cluuid->uuid, rc);
900                         GOTO(exit_err, rc = -EALREADY);
901                 }
902         }
903
904         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
905         spin_lock(&obd->obd_dev_lock);
906         if (obd->obd_stopping) {
907                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
908                 GOTO(exit_unlock, rc = -ENODEV);
909         }
910
911         class_incref(obd, "export", export);
912         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
913         list_add_tail(&export->exp_obd_chain_timed,
914                       &export->exp_obd->obd_exports_timed);
915         export->exp_obd->obd_num_exports++;
916         spin_unlock(&obd->obd_dev_lock);
917         cfs_hash_putref(hash);
918         RETURN(export);
919
920 exit_unlock:
921         spin_unlock(&obd->obd_dev_lock);
922 exit_err:
923         if (hash)
924                 cfs_hash_putref(hash);
925         class_handle_unhash(&export->exp_handle);
926         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
927         obd_destroy_export(export);
928         OBD_FREE_PTR(export);
929         return ERR_PTR(rc);
930 }
931 EXPORT_SYMBOL(class_new_export);
932
933 void class_unlink_export(struct obd_export *exp)
934 {
935         class_handle_unhash(&exp->exp_handle);
936
937         spin_lock(&exp->exp_obd->obd_dev_lock);
938         /* delete an uuid-export hashitem from hashtables */
939         if (!hlist_unhashed(&exp->exp_uuid_hash))
940                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
941                              &exp->exp_client_uuid,
942                              &exp->exp_uuid_hash);
943
944         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
945         list_del_init(&exp->exp_obd_chain_timed);
946         exp->exp_obd->obd_num_exports--;
947         spin_unlock(&exp->exp_obd->obd_dev_lock);
948         class_export_put(exp);
949 }
950 EXPORT_SYMBOL(class_unlink_export);
951
952 /* Import management functions */
953 void class_import_destroy(struct obd_import *imp)
954 {
955         ENTRY;
956
957         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
958                 imp->imp_obd->obd_name);
959
960         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
961
962         ptlrpc_put_connection_superhack(imp->imp_connection);
963
964         while (!list_empty(&imp->imp_conn_list)) {
965                 struct obd_import_conn *imp_conn;
966
967                 imp_conn = list_entry(imp->imp_conn_list.next,
968                                       struct obd_import_conn, oic_item);
969                 list_del_init(&imp_conn->oic_item);
970                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
971                 OBD_FREE(imp_conn, sizeof(*imp_conn));
972         }
973
974         LASSERT(imp->imp_sec == NULL);
975         class_decref(imp->imp_obd, "import", imp);
976         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
977         EXIT;
978 }
979
980 static void import_handle_addref(void *import)
981 {
982         class_import_get(import);
983 }
984
985 static struct portals_handle_ops import_handle_ops = {
986         .hop_addref = import_handle_addref,
987         .hop_free   = NULL,
988 };
989
990 struct obd_import *class_import_get(struct obd_import *import)
991 {
992         atomic_inc(&import->imp_refcount);
993         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
994                atomic_read(&import->imp_refcount),
995                import->imp_obd->obd_name);
996         return import;
997 }
998 EXPORT_SYMBOL(class_import_get);
999
1000 void class_import_put(struct obd_import *imp)
1001 {
1002         ENTRY;
1003
1004         LASSERT(list_empty(&imp->imp_zombie_chain));
1005         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1006
1007         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1008                atomic_read(&imp->imp_refcount) - 1,
1009                imp->imp_obd->obd_name);
1010
1011         if (atomic_dec_and_test(&imp->imp_refcount)) {
1012                 CDEBUG(D_INFO, "final put import %p\n", imp);
1013                 obd_zombie_import_add(imp);
1014         }
1015
1016         /* catch possible import put race */
1017         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1018         EXIT;
1019 }
1020 EXPORT_SYMBOL(class_import_put);
1021
1022 static void init_imp_at(struct imp_at *at) {
1023         int i;
1024         at_init(&at->iat_net_latency, 0, 0);
1025         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1026                 /* max service estimates are tracked on the server side, so
1027                    don't use the AT history here, just use the last reported
1028                    val. (But keep hist for proc histogram, worst_ever) */
1029                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1030                         AT_FLG_NOHIST);
1031         }
1032 }
1033
1034 struct obd_import *class_new_import(struct obd_device *obd)
1035 {
1036         struct obd_import *imp;
1037
1038         OBD_ALLOC(imp, sizeof(*imp));
1039         if (imp == NULL)
1040                 return NULL;
1041
1042         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1043         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1044         INIT_LIST_HEAD(&imp->imp_replay_list);
1045         INIT_LIST_HEAD(&imp->imp_sending_list);
1046         INIT_LIST_HEAD(&imp->imp_delayed_list);
1047         INIT_LIST_HEAD(&imp->imp_committed_list);
1048         imp->imp_replay_cursor = &imp->imp_committed_list;
1049         spin_lock_init(&imp->imp_lock);
1050         imp->imp_last_success_conn = 0;
1051         imp->imp_state = LUSTRE_IMP_NEW;
1052         imp->imp_obd = class_incref(obd, "import", imp);
1053         mutex_init(&imp->imp_sec_mutex);
1054         init_waitqueue_head(&imp->imp_recovery_waitq);
1055
1056         atomic_set(&imp->imp_refcount, 2);
1057         atomic_set(&imp->imp_unregistering, 0);
1058         atomic_set(&imp->imp_inflight, 0);
1059         atomic_set(&imp->imp_replay_inflight, 0);
1060         atomic_set(&imp->imp_inval_count, 0);
1061         INIT_LIST_HEAD(&imp->imp_conn_list);
1062         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1063         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1064         init_imp_at(&imp->imp_at);
1065
1066         /* the default magic is V2, will be used in connect RPC, and
1067          * then adjusted according to the flags in request/reply. */
1068         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1069
1070         return imp;
1071 }
1072 EXPORT_SYMBOL(class_new_import);
1073
1074 void class_destroy_import(struct obd_import *import)
1075 {
1076         LASSERT(import != NULL);
1077         LASSERT(import != LP_POISON);
1078
1079         class_handle_unhash(&import->imp_handle);
1080
1081         spin_lock(&import->imp_lock);
1082         import->imp_generation++;
1083         spin_unlock(&import->imp_lock);
1084         class_import_put(import);
1085 }
1086 EXPORT_SYMBOL(class_destroy_import);
1087
1088 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1089
1090 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1091 {
1092         spin_lock(&exp->exp_locks_list_guard);
1093
1094         LASSERT(lock->l_exp_refs_nr >= 0);
1095
1096         if (lock->l_exp_refs_target != NULL &&
1097             lock->l_exp_refs_target != exp) {
1098                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1099                               exp, lock, lock->l_exp_refs_target);
1100         }
1101         if ((lock->l_exp_refs_nr ++) == 0) {
1102                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1103                 lock->l_exp_refs_target = exp;
1104         }
1105         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1106                lock, exp, lock->l_exp_refs_nr);
1107         spin_unlock(&exp->exp_locks_list_guard);
1108 }
1109 EXPORT_SYMBOL(__class_export_add_lock_ref);
1110
1111 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1112 {
1113         spin_lock(&exp->exp_locks_list_guard);
1114         LASSERT(lock->l_exp_refs_nr > 0);
1115         if (lock->l_exp_refs_target != exp) {
1116                 LCONSOLE_WARN("lock %p, "
1117                               "mismatching export pointers: %p, %p\n",
1118                               lock, lock->l_exp_refs_target, exp);
1119         }
1120         if (-- lock->l_exp_refs_nr == 0) {
1121                 list_del_init(&lock->l_exp_refs_link);
1122                 lock->l_exp_refs_target = NULL;
1123         }
1124         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1125                lock, exp, lock->l_exp_refs_nr);
1126         spin_unlock(&exp->exp_locks_list_guard);
1127 }
1128 EXPORT_SYMBOL(__class_export_del_lock_ref);
1129 #endif
1130
1131 /* A connection defines an export context in which preallocation can
1132    be managed. This releases the export pointer reference, and returns
1133    the export handle, so the export refcount is 1 when this function
1134    returns. */
1135 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1136                   struct obd_uuid *cluuid)
1137 {
1138         struct obd_export *export;
1139         LASSERT(conn != NULL);
1140         LASSERT(obd != NULL);
1141         LASSERT(cluuid != NULL);
1142         ENTRY;
1143
1144         export = class_new_export(obd, cluuid);
1145         if (IS_ERR(export))
1146                 RETURN(PTR_ERR(export));
1147
1148         conn->cookie = export->exp_handle.h_cookie;
1149         class_export_put(export);
1150
1151         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1152                cluuid->uuid, conn->cookie);
1153         RETURN(0);
1154 }
1155 EXPORT_SYMBOL(class_connect);
1156
1157 /* if export is involved in recovery then clean up related things */
1158 void class_export_recovery_cleanup(struct obd_export *exp)
1159 {
1160         struct obd_device *obd = exp->exp_obd;
1161
1162         spin_lock(&obd->obd_recovery_task_lock);
1163         if (obd->obd_recovering) {
1164                 if (exp->exp_in_recovery) {
1165                         spin_lock(&exp->exp_lock);
1166                         exp->exp_in_recovery = 0;
1167                         spin_unlock(&exp->exp_lock);
1168                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1169                         atomic_dec(&obd->obd_connected_clients);
1170                 }
1171
1172                 /* if called during recovery then should update
1173                  * obd_stale_clients counter,
1174                  * lightweight exports are not counted */
1175                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1176                         exp->exp_obd->obd_stale_clients++;
1177         }
1178         spin_unlock(&obd->obd_recovery_task_lock);
1179
1180         spin_lock(&exp->exp_lock);
1181         /** Cleanup req replay fields */
1182         if (exp->exp_req_replay_needed) {
1183                 exp->exp_req_replay_needed = 0;
1184
1185                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1186                 atomic_dec(&obd->obd_req_replay_clients);
1187         }
1188
1189         /** Cleanup lock replay data */
1190         if (exp->exp_lock_replay_needed) {
1191                 exp->exp_lock_replay_needed = 0;
1192
1193                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1194                 atomic_dec(&obd->obd_lock_replay_clients);
1195         }
1196         spin_unlock(&exp->exp_lock);
1197 }
1198
1199 /* This function removes 1-3 references from the export:
1200  * 1 - for export pointer passed
1201  * and if disconnect really need
1202  * 2 - removing from hash
1203  * 3 - in client_unlink_export
1204  * The export pointer passed to this function can destroyed */
1205 int class_disconnect(struct obd_export *export)
1206 {
1207         int already_disconnected;
1208         ENTRY;
1209
1210         if (export == NULL) {
1211                 CWARN("attempting to free NULL export %p\n", export);
1212                 RETURN(-EINVAL);
1213         }
1214
1215         spin_lock(&export->exp_lock);
1216         already_disconnected = export->exp_disconnected;
1217         export->exp_disconnected = 1;
1218         spin_unlock(&export->exp_lock);
1219
1220         /* class_cleanup(), abort_recovery(), and class_fail_export()
1221          * all end up in here, and if any of them race we shouldn't
1222          * call extra class_export_puts(). */
1223         if (already_disconnected) {
1224                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1225                 GOTO(no_disconn, already_disconnected);
1226         }
1227
1228         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1229                export->exp_handle.h_cookie);
1230
1231         if (!hlist_unhashed(&export->exp_nid_hash))
1232                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1233                              &export->exp_connection->c_peer.nid,
1234                              &export->exp_nid_hash);
1235
1236         class_export_recovery_cleanup(export);
1237         class_unlink_export(export);
1238 no_disconn:
1239         class_export_put(export);
1240         RETURN(0);
1241 }
1242 EXPORT_SYMBOL(class_disconnect);
1243
1244 /* Return non-zero for a fully connected export */
1245 int class_connected_export(struct obd_export *exp)
1246 {
1247         int connected = 0;
1248
1249         if (exp) {
1250                 spin_lock(&exp->exp_lock);
1251                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1252                 spin_unlock(&exp->exp_lock);
1253         }
1254         return connected;
1255 }
1256 EXPORT_SYMBOL(class_connected_export);
1257
1258 static void class_disconnect_export_list(struct list_head *list,
1259                                          enum obd_option flags)
1260 {
1261         int rc;
1262         struct obd_export *exp;
1263         ENTRY;
1264
1265         /* It's possible that an export may disconnect itself, but
1266          * nothing else will be added to this list. */
1267         while (!list_empty(list)) {
1268                 exp = list_entry(list->next, struct obd_export,
1269                                  exp_obd_chain);
1270                 /* need for safe call CDEBUG after obd_disconnect */
1271                 class_export_get(exp);
1272
1273                 spin_lock(&exp->exp_lock);
1274                 exp->exp_flags = flags;
1275                 spin_unlock(&exp->exp_lock);
1276
1277                 if (obd_uuid_equals(&exp->exp_client_uuid,
1278                                     &exp->exp_obd->obd_uuid)) {
1279                         CDEBUG(D_HA,
1280                                "exp %p export uuid == obd uuid, don't discon\n",
1281                                exp);
1282                         /* Need to delete this now so we don't end up pointing
1283                          * to work_list later when this export is cleaned up. */
1284                         list_del_init(&exp->exp_obd_chain);
1285                         class_export_put(exp);
1286                         continue;
1287                 }
1288
1289                 class_export_get(exp);
1290                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1291                        "last request at "CFS_TIME_T"\n",
1292                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1293                        exp, exp->exp_last_request_time);
1294                 /* release one export reference anyway */
1295                 rc = obd_disconnect(exp);
1296
1297                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1298                        obd_export_nid2str(exp), exp, rc);
1299                 class_export_put(exp);
1300         }
1301         EXIT;
1302 }
1303
1304 void class_disconnect_exports(struct obd_device *obd)
1305 {
1306         struct list_head work_list;
1307         ENTRY;
1308
1309         /* Move all of the exports from obd_exports to a work list, en masse. */
1310         INIT_LIST_HEAD(&work_list);
1311         spin_lock(&obd->obd_dev_lock);
1312         list_splice_init(&obd->obd_exports, &work_list);
1313         list_splice_init(&obd->obd_delayed_exports, &work_list);
1314         spin_unlock(&obd->obd_dev_lock);
1315
1316         if (!list_empty(&work_list)) {
1317                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1318                        "disconnecting them\n", obd->obd_minor, obd);
1319                 class_disconnect_export_list(&work_list,
1320                                              exp_flags_from_obd(obd));
1321         } else
1322                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1323                        obd->obd_minor, obd);
1324         EXIT;
1325 }
1326 EXPORT_SYMBOL(class_disconnect_exports);
1327
1328 /* Remove exports that have not completed recovery.
1329  */
1330 void class_disconnect_stale_exports(struct obd_device *obd,
1331                                     int (*test_export)(struct obd_export *))
1332 {
1333         struct list_head work_list;
1334         struct obd_export *exp, *n;
1335         int evicted = 0;
1336         ENTRY;
1337
1338         INIT_LIST_HEAD(&work_list);
1339         spin_lock(&obd->obd_dev_lock);
1340         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1341                                  exp_obd_chain) {
1342                 /* don't count self-export as client */
1343                 if (obd_uuid_equals(&exp->exp_client_uuid,
1344                                     &exp->exp_obd->obd_uuid))
1345                         continue;
1346
1347                 /* don't evict clients which have no slot in last_rcvd
1348                  * (e.g. lightweight connection) */
1349                 if (exp->exp_target_data.ted_lr_idx == -1)
1350                         continue;
1351
1352                 spin_lock(&exp->exp_lock);
1353                 if (exp->exp_failed || test_export(exp)) {
1354                         spin_unlock(&exp->exp_lock);
1355                         continue;
1356                 }
1357                 exp->exp_failed = 1;
1358                 spin_unlock(&exp->exp_lock);
1359
1360                 list_move(&exp->exp_obd_chain, &work_list);
1361                 evicted++;
1362                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1363                        obd->obd_name, exp->exp_client_uuid.uuid,
1364                        exp->exp_connection == NULL ? "<unknown>" :
1365                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1366                 print_export_data(exp, "EVICTING", 0);
1367         }
1368         spin_unlock(&obd->obd_dev_lock);
1369
1370         if (evicted)
1371                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1372                               obd->obd_name, evicted);
1373
1374         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1375                                                  OBD_OPT_ABORT_RECOV);
1376         EXIT;
1377 }
1378 EXPORT_SYMBOL(class_disconnect_stale_exports);
1379
1380 void class_fail_export(struct obd_export *exp)
1381 {
1382         int rc, already_failed;
1383
1384         spin_lock(&exp->exp_lock);
1385         already_failed = exp->exp_failed;
1386         exp->exp_failed = 1;
1387         spin_unlock(&exp->exp_lock);
1388
1389         if (already_failed) {
1390                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1391                        exp, exp->exp_client_uuid.uuid);
1392                 return;
1393         }
1394
1395         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1396                exp, exp->exp_client_uuid.uuid);
1397
1398         if (obd_dump_on_timeout)
1399                 libcfs_debug_dumplog();
1400
1401         /* need for safe call CDEBUG after obd_disconnect */
1402         class_export_get(exp);
1403
1404         /* Most callers into obd_disconnect are removing their own reference
1405          * (request, for example) in addition to the one from the hash table.
1406          * We don't have such a reference here, so make one. */
1407         class_export_get(exp);
1408         rc = obd_disconnect(exp);
1409         if (rc)
1410                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1411         else
1412                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1413                        exp, exp->exp_client_uuid.uuid);
1414         class_export_put(exp);
1415 }
1416 EXPORT_SYMBOL(class_fail_export);
1417
1418 char *obd_export_nid2str(struct obd_export *exp)
1419 {
1420         if (exp->exp_connection != NULL)
1421                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1422
1423         return "(no nid)";
1424 }
1425 EXPORT_SYMBOL(obd_export_nid2str);
1426
1427 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1428 {
1429         cfs_hash_t *nid_hash;
1430         struct obd_export *doomed_exp = NULL;
1431         int exports_evicted = 0;
1432
1433         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1434
1435         spin_lock(&obd->obd_dev_lock);
1436         /* umount has run already, so evict thread should leave
1437          * its task to umount thread now */
1438         if (obd->obd_stopping) {
1439                 spin_unlock(&obd->obd_dev_lock);
1440                 return exports_evicted;
1441         }
1442         nid_hash = obd->obd_nid_hash;
1443         cfs_hash_getref(nid_hash);
1444         spin_unlock(&obd->obd_dev_lock);
1445
1446         do {
1447                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1448                 if (doomed_exp == NULL)
1449                         break;
1450
1451                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1452                          "nid %s found, wanted nid %s, requested nid %s\n",
1453                          obd_export_nid2str(doomed_exp),
1454                          libcfs_nid2str(nid_key), nid);
1455                 LASSERTF(doomed_exp != obd->obd_self_export,
1456                          "self-export is hashed by NID?\n");
1457                 exports_evicted++;
1458                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1459                               "request\n", obd->obd_name,
1460                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1461                               obd_export_nid2str(doomed_exp));
1462                 class_fail_export(doomed_exp);
1463                 class_export_put(doomed_exp);
1464         } while (1);
1465
1466         cfs_hash_putref(nid_hash);
1467
1468         if (!exports_evicted)
1469                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1470                        obd->obd_name, nid);
1471         return exports_evicted;
1472 }
1473 EXPORT_SYMBOL(obd_export_evict_by_nid);
1474
1475 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1476 {
1477         cfs_hash_t *uuid_hash;
1478         struct obd_export *doomed_exp = NULL;
1479         struct obd_uuid doomed_uuid;
1480         int exports_evicted = 0;
1481
1482         spin_lock(&obd->obd_dev_lock);
1483         if (obd->obd_stopping) {
1484                 spin_unlock(&obd->obd_dev_lock);
1485                 return exports_evicted;
1486         }
1487         uuid_hash = obd->obd_uuid_hash;
1488         cfs_hash_getref(uuid_hash);
1489         spin_unlock(&obd->obd_dev_lock);
1490
1491         obd_str2uuid(&doomed_uuid, uuid);
1492         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1493                 CERROR("%s: can't evict myself\n", obd->obd_name);
1494                 cfs_hash_putref(uuid_hash);
1495                 return exports_evicted;
1496         }
1497
1498         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1499
1500         if (doomed_exp == NULL) {
1501                 CERROR("%s: can't disconnect %s: no exports found\n",
1502                        obd->obd_name, uuid);
1503         } else {
1504                 CWARN("%s: evicting %s at adminstrative request\n",
1505                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1506                 class_fail_export(doomed_exp);
1507                 class_export_put(doomed_exp);
1508                 exports_evicted++;
1509         }
1510         cfs_hash_putref(uuid_hash);
1511
1512         return exports_evicted;
1513 }
1514 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1515
1516 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1517 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1518 EXPORT_SYMBOL(class_export_dump_hook);
1519 #endif
1520
1521 static void print_export_data(struct obd_export *exp, const char *status,
1522                               int locks)
1523 {
1524         struct ptlrpc_reply_state *rs;
1525         struct ptlrpc_reply_state *first_reply = NULL;
1526         int nreplies = 0;
1527
1528         spin_lock(&exp->exp_lock);
1529         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1530                             rs_exp_list) {
1531                 if (nreplies == 0)
1532                         first_reply = rs;
1533                 nreplies++;
1534         }
1535         spin_unlock(&exp->exp_lock);
1536
1537         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1538                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1539                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1540                atomic_read(&exp->exp_rpc_count),
1541                atomic_read(&exp->exp_cb_count),
1542                atomic_read(&exp->exp_locks_count),
1543                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1544                nreplies, first_reply, nreplies > 3 ? "..." : "",
1545                exp->exp_last_committed);
1546 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1547         if (locks && class_export_dump_hook != NULL)
1548                 class_export_dump_hook(exp);
1549 #endif
1550 }
1551
1552 void dump_exports(struct obd_device *obd, int locks)
1553 {
1554         struct obd_export *exp;
1555
1556         spin_lock(&obd->obd_dev_lock);
1557         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1558                 print_export_data(exp, "ACTIVE", locks);
1559         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1560                 print_export_data(exp, "UNLINKED", locks);
1561         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1562                 print_export_data(exp, "DELAYED", locks);
1563         spin_unlock(&obd->obd_dev_lock);
1564         spin_lock(&obd_zombie_impexp_lock);
1565         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1566                 print_export_data(exp, "ZOMBIE", locks);
1567         spin_unlock(&obd_zombie_impexp_lock);
1568 }
1569 EXPORT_SYMBOL(dump_exports);
1570
1571 void obd_exports_barrier(struct obd_device *obd)
1572 {
1573         int waited = 2;
1574         LASSERT(list_empty(&obd->obd_exports));
1575         spin_lock(&obd->obd_dev_lock);
1576         while (!list_empty(&obd->obd_unlinked_exports)) {
1577                 spin_unlock(&obd->obd_dev_lock);
1578                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1579                                                    cfs_time_seconds(waited));
1580                 if (waited > 5 && IS_PO2(waited)) {
1581                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1582                                       "more than %d seconds. "
1583                                       "The obd refcount = %d. Is it stuck?\n",
1584                                       obd->obd_name, waited,
1585                                       atomic_read(&obd->obd_refcount));
1586                         dump_exports(obd, 1);
1587                 }
1588                 waited *= 2;
1589                 spin_lock(&obd->obd_dev_lock);
1590         }
1591         spin_unlock(&obd->obd_dev_lock);
1592 }
1593 EXPORT_SYMBOL(obd_exports_barrier);
1594
1595 /* Total amount of zombies to be destroyed */
1596 static int zombies_count = 0;
1597
1598 /**
1599  * kill zombie imports and exports
1600  */
1601 void obd_zombie_impexp_cull(void)
1602 {
1603         struct obd_import *import;
1604         struct obd_export *export;
1605         ENTRY;
1606
1607         do {
1608                 spin_lock(&obd_zombie_impexp_lock);
1609
1610                 import = NULL;
1611                 if (!list_empty(&obd_zombie_imports)) {
1612                         import = list_entry(obd_zombie_imports.next,
1613                                             struct obd_import,
1614                                             imp_zombie_chain);
1615                         list_del_init(&import->imp_zombie_chain);
1616                 }
1617
1618                 export = NULL;
1619                 if (!list_empty(&obd_zombie_exports)) {
1620                         export = list_entry(obd_zombie_exports.next,
1621                                             struct obd_export,
1622                                             exp_obd_chain);
1623                         list_del_init(&export->exp_obd_chain);
1624                 }
1625
1626                 spin_unlock(&obd_zombie_impexp_lock);
1627
1628                 if (import != NULL) {
1629                         class_import_destroy(import);
1630                         spin_lock(&obd_zombie_impexp_lock);
1631                         zombies_count--;
1632                         spin_unlock(&obd_zombie_impexp_lock);
1633                 }
1634
1635                 if (export != NULL) {
1636                         class_export_destroy(export);
1637                         spin_lock(&obd_zombie_impexp_lock);
1638                         zombies_count--;
1639                         spin_unlock(&obd_zombie_impexp_lock);
1640                 }
1641
1642                 cond_resched();
1643         } while (import != NULL || export != NULL);
1644         EXIT;
1645 }
1646
1647 static struct completion        obd_zombie_start;
1648 static struct completion        obd_zombie_stop;
1649 static unsigned long            obd_zombie_flags;
1650 static wait_queue_head_t        obd_zombie_waitq;
1651 static pid_t                    obd_zombie_pid;
1652
1653 enum {
1654         OBD_ZOMBIE_STOP         = 0x0001,
1655 };
1656
1657 /**
1658  * check for work for kill zombie import/export thread.
1659  */
1660 static int obd_zombie_impexp_check(void *arg)
1661 {
1662         int rc;
1663
1664         spin_lock(&obd_zombie_impexp_lock);
1665         rc = (zombies_count == 0) &&
1666              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1667         spin_unlock(&obd_zombie_impexp_lock);
1668
1669         RETURN(rc);
1670 }
1671
1672 /**
1673  * Add export to the obd_zombe thread and notify it.
1674  */
1675 static void obd_zombie_export_add(struct obd_export *exp) {
1676         spin_lock(&exp->exp_obd->obd_dev_lock);
1677         LASSERT(!list_empty(&exp->exp_obd_chain));
1678         list_del_init(&exp->exp_obd_chain);
1679         spin_unlock(&exp->exp_obd->obd_dev_lock);
1680         spin_lock(&obd_zombie_impexp_lock);
1681         zombies_count++;
1682         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1683         spin_unlock(&obd_zombie_impexp_lock);
1684
1685         obd_zombie_impexp_notify();
1686 }
1687
1688 /**
1689  * Add import to the obd_zombe thread and notify it.
1690  */
1691 static void obd_zombie_import_add(struct obd_import *imp) {
1692         LASSERT(imp->imp_sec == NULL);
1693         LASSERT(imp->imp_rq_pool == NULL);
1694         spin_lock(&obd_zombie_impexp_lock);
1695         LASSERT(list_empty(&imp->imp_zombie_chain));
1696         zombies_count++;
1697         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1698         spin_unlock(&obd_zombie_impexp_lock);
1699
1700         obd_zombie_impexp_notify();
1701 }
1702
1703 /**
1704  * notify import/export destroy thread about new zombie.
1705  */
1706 static void obd_zombie_impexp_notify(void)
1707 {
1708         /*
1709          * Make sure obd_zomebie_impexp_thread get this notification.
1710          * It is possible this signal only get by obd_zombie_barrier, and
1711          * barrier gulps this notification and sleeps away and hangs ensues
1712          */
1713         wake_up_all(&obd_zombie_waitq);
1714 }
1715
1716 /**
1717  * check whether obd_zombie is idle
1718  */
1719 static int obd_zombie_is_idle(void)
1720 {
1721         int rc;
1722
1723         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1724         spin_lock(&obd_zombie_impexp_lock);
1725         rc = (zombies_count == 0);
1726         spin_unlock(&obd_zombie_impexp_lock);
1727         return rc;
1728 }
1729
1730 /**
1731  * wait when obd_zombie import/export queues become empty
1732  */
1733 void obd_zombie_barrier(void)
1734 {
1735         struct l_wait_info lwi = { 0 };
1736
1737         if (obd_zombie_pid == current_pid())
1738                 /* don't wait for myself */
1739                 return;
1740         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1741 }
1742 EXPORT_SYMBOL(obd_zombie_barrier);
1743
1744
1745 /**
1746  * destroy zombie export/import thread.
1747  */
1748 static int obd_zombie_impexp_thread(void *unused)
1749 {
1750         unshare_fs_struct();
1751         complete(&obd_zombie_start);
1752
1753         obd_zombie_pid = current_pid();
1754
1755         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1756                 struct l_wait_info lwi = { 0 };
1757
1758                 l_wait_event(obd_zombie_waitq,
1759                              !obd_zombie_impexp_check(NULL), &lwi);
1760                 obd_zombie_impexp_cull();
1761
1762                 /*
1763                  * Notify obd_zombie_barrier callers that queues
1764                  * may be empty.
1765                  */
1766                 wake_up(&obd_zombie_waitq);
1767         }
1768
1769         complete(&obd_zombie_stop);
1770
1771         RETURN(0);
1772 }
1773
1774
1775 /**
1776  * start destroy zombie import/export thread
1777  */
1778 int obd_zombie_impexp_init(void)
1779 {
1780         struct task_struct *task;
1781
1782         INIT_LIST_HEAD(&obd_zombie_imports);
1783
1784         INIT_LIST_HEAD(&obd_zombie_exports);
1785         spin_lock_init(&obd_zombie_impexp_lock);
1786         init_completion(&obd_zombie_start);
1787         init_completion(&obd_zombie_stop);
1788         init_waitqueue_head(&obd_zombie_waitq);
1789         obd_zombie_pid = 0;
1790
1791         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1792         if (IS_ERR(task))
1793                 RETURN(PTR_ERR(task));
1794
1795         wait_for_completion(&obd_zombie_start);
1796         RETURN(0);
1797 }
1798 /**
1799  * stop destroy zombie import/export thread
1800  */
1801 void obd_zombie_impexp_stop(void)
1802 {
1803         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1804         obd_zombie_impexp_notify();
1805         wait_for_completion(&obd_zombie_stop);
1806 }
1807
1808 /***** Kernel-userspace comm helpers *******/
1809
1810 /* Get length of entire message, including header */
1811 int kuc_len(int payload_len)
1812 {
1813         return sizeof(struct kuc_hdr) + payload_len;
1814 }
1815 EXPORT_SYMBOL(kuc_len);
1816
1817 /* Get a pointer to kuc header, given a ptr to the payload
1818  * @param p Pointer to payload area
1819  * @returns Pointer to kuc header
1820  */
1821 struct kuc_hdr * kuc_ptr(void *p)
1822 {
1823         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1824         LASSERT(lh->kuc_magic == KUC_MAGIC);
1825         return lh;
1826 }
1827 EXPORT_SYMBOL(kuc_ptr);
1828
1829 /* Test if payload is part of kuc message
1830  * @param p Pointer to payload area
1831  * @returns boolean
1832  */
1833 int kuc_ispayload(void *p)
1834 {
1835         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1836
1837         if (kh->kuc_magic == KUC_MAGIC)
1838                 return 1;
1839         else
1840                 return 0;
1841 }
1842 EXPORT_SYMBOL(kuc_ispayload);
1843
1844 /* Alloc space for a message, and fill in header
1845  * @return Pointer to payload area
1846  */
1847 void *kuc_alloc(int payload_len, int transport, int type)
1848 {
1849         struct kuc_hdr *lh;
1850         int len = kuc_len(payload_len);
1851
1852         OBD_ALLOC(lh, len);
1853         if (lh == NULL)
1854                 return ERR_PTR(-ENOMEM);
1855
1856         lh->kuc_magic = KUC_MAGIC;
1857         lh->kuc_transport = transport;
1858         lh->kuc_msgtype = type;
1859         lh->kuc_msglen = len;
1860
1861         return (void *)(lh + 1);
1862 }
1863 EXPORT_SYMBOL(kuc_alloc);
1864
1865 /* Takes pointer to payload area */
1866 inline void kuc_free(void *p, int payload_len)
1867 {
1868         struct kuc_hdr *lh = kuc_ptr(p);
1869         OBD_FREE(lh, kuc_len(payload_len));
1870 }
1871 EXPORT_SYMBOL(kuc_free);
1872
1873 struct obd_request_slot_waiter {
1874         struct list_head        orsw_entry;
1875         wait_queue_head_t       orsw_waitq;
1876         bool                    orsw_signaled;
1877 };
1878
1879 static bool obd_request_slot_avail(struct client_obd *cli,
1880                                    struct obd_request_slot_waiter *orsw)
1881 {
1882         bool avail;
1883
1884         spin_lock(&cli->cl_loi_list_lock);
1885         avail = !!list_empty(&orsw->orsw_entry);
1886         spin_unlock(&cli->cl_loi_list_lock);
1887
1888         return avail;
1889 };
1890
1891 /*
1892  * For network flow control, the RPC sponsor needs to acquire a credit
1893  * before sending the RPC. The credits count for a connection is defined
1894  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1895  * the subsequent RPC sponsors need to wait until others released their
1896  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1897  */
1898 int obd_get_request_slot(struct client_obd *cli)
1899 {
1900         struct obd_request_slot_waiter   orsw;
1901         struct l_wait_info               lwi;
1902         int                              rc;
1903
1904         spin_lock(&cli->cl_loi_list_lock);
1905         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1906                 cli->cl_r_in_flight++;
1907                 spin_unlock(&cli->cl_loi_list_lock);
1908                 return 0;
1909         }
1910
1911         init_waitqueue_head(&orsw.orsw_waitq);
1912         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1913         orsw.orsw_signaled = false;
1914         spin_unlock(&cli->cl_loi_list_lock);
1915
1916         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1917         rc = l_wait_event(orsw.orsw_waitq,
1918                           obd_request_slot_avail(cli, &orsw) ||
1919                           orsw.orsw_signaled,
1920                           &lwi);
1921
1922         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1923          * freed but other (such as obd_put_request_slot) is using it. */
1924         spin_lock(&cli->cl_loi_list_lock);
1925         if (rc != 0) {
1926                 if (!orsw.orsw_signaled) {
1927                         if (list_empty(&orsw.orsw_entry))
1928                                 cli->cl_r_in_flight--;
1929                         else
1930                                 list_del(&orsw.orsw_entry);
1931                 }
1932         }
1933
1934         if (orsw.orsw_signaled) {
1935                 LASSERT(list_empty(&orsw.orsw_entry));
1936
1937                 rc = -EINTR;
1938         }
1939         spin_unlock(&cli->cl_loi_list_lock);
1940
1941         return rc;
1942 }
1943 EXPORT_SYMBOL(obd_get_request_slot);
1944
1945 void obd_put_request_slot(struct client_obd *cli)
1946 {
1947         struct obd_request_slot_waiter *orsw;
1948
1949         spin_lock(&cli->cl_loi_list_lock);
1950         cli->cl_r_in_flight--;
1951
1952         /* If there is free slot, wakeup the first waiter. */
1953         if (!list_empty(&cli->cl_loi_read_list) &&
1954             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1955                 orsw = list_entry(cli->cl_loi_read_list.next,
1956                                   struct obd_request_slot_waiter, orsw_entry);
1957                 list_del_init(&orsw->orsw_entry);
1958                 cli->cl_r_in_flight++;
1959                 wake_up(&orsw->orsw_waitq);
1960         }
1961         spin_unlock(&cli->cl_loi_list_lock);
1962 }
1963 EXPORT_SYMBOL(obd_put_request_slot);
1964
1965 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1966 {
1967         return cli->cl_max_rpcs_in_flight;
1968 }
1969 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1970
1971 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1972 {
1973         struct obd_request_slot_waiter *orsw;
1974         __u32                           old;
1975         int                             diff;
1976         int                             i;
1977
1978         if (max > OBD_MAX_RIF_MAX || max < 1)
1979                 return -ERANGE;
1980
1981         spin_lock(&cli->cl_loi_list_lock);
1982         old = cli->cl_max_rpcs_in_flight;
1983         cli->cl_max_rpcs_in_flight = max;
1984         diff = max - old;
1985
1986         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1987         for (i = 0; i < diff; i++) {
1988                 if (list_empty(&cli->cl_loi_read_list))
1989                         break;
1990
1991                 orsw = list_entry(cli->cl_loi_read_list.next,
1992                                   struct obd_request_slot_waiter, orsw_entry);
1993                 list_del_init(&orsw->orsw_entry);
1994                 cli->cl_r_in_flight++;
1995                 wake_up(&orsw->orsw_waitq);
1996         }
1997         spin_unlock(&cli->cl_loi_list_lock);
1998
1999         return 0;
2000 }
2001 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);