Whamcloud - gitweb
LU-5396 obdclass: make some functions static
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
45
46 spinlock_t obd_types_lock;
47
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t  obd_zombie_impexp_lock;
56
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks);
62
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80
81 static void obd_device_free(struct obd_device *obd)
82 {
83         LASSERT(obd != NULL);
84         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86         if (obd->obd_namespace != NULL) {
87                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88                        obd, obd->obd_namespace, obd->obd_force);
89                 LBUG();
90         }
91         lu_ref_fini(&obd->obd_reference);
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94
95 struct obd_type *class_search_type(const char *name)
96 {
97         struct list_head *tmp;
98         struct obd_type *type;
99
100         spin_lock(&obd_types_lock);
101         list_for_each(tmp, &obd_types) {
102                 type = list_entry(tmp, struct obd_type, typ_chain);
103                 if (strcmp(type->typ_name, name) == 0) {
104                         spin_unlock(&obd_types_lock);
105                         return type;
106                 }
107         }
108         spin_unlock(&obd_types_lock);
109         return NULL;
110 }
111 EXPORT_SYMBOL(class_search_type);
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef HAVE_MODULE_LOADING_SUPPORT
118         if (!type) {
119                 const char *modname = name;
120
121                 if (strcmp(modname, "obdfilter") == 0)
122                         modname = "ofd";
123
124                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125                         modname = LUSTRE_OSP_NAME;
126
127                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128                         modname = LUSTRE_MDT_NAME;
129
130                 if (!request_module("%s", modname)) {
131                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132                         type = class_search_type(name);
133                 } else {
134                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135                                            modname);
136                 }
137         }
138 #endif
139         if (type) {
140                 spin_lock(&type->obd_type_lock);
141                 type->typ_refcnt++;
142                 try_module_get(type->typ_dt_ops->o_owner);
143                 spin_unlock(&type->obd_type_lock);
144         }
145         return type;
146 }
147 EXPORT_SYMBOL(class_get_type);
148
149 void class_put_type(struct obd_type *type)
150 {
151         LASSERT(type);
152         spin_lock(&type->obd_type_lock);
153         type->typ_refcnt--;
154         module_put(type->typ_dt_ops->o_owner);
155         spin_unlock(&type->obd_type_lock);
156 }
157 EXPORT_SYMBOL(class_put_type);
158
159 #define CLASS_MAX_NAME 1024
160
161 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
162                         bool enable_proc, struct lprocfs_seq_vars *vars,
163                         const char *name, struct lu_device_type *ldt)
164 {
165         struct obd_type *type;
166         int rc = 0;
167         ENTRY;
168
169         /* sanity check */
170         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
171
172         if (class_search_type(name)) {
173                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
174                 RETURN(-EEXIST);
175         }
176
177         rc = -ENOMEM;
178         OBD_ALLOC(type, sizeof(*type));
179         if (type == NULL)
180                 RETURN(rc);
181
182         OBD_ALLOC_PTR(type->typ_dt_ops);
183         OBD_ALLOC_PTR(type->typ_md_ops);
184         OBD_ALLOC(type->typ_name, strlen(name) + 1);
185
186         if (type->typ_dt_ops == NULL ||
187             type->typ_md_ops == NULL ||
188             type->typ_name == NULL)
189                 GOTO (failed, rc);
190
191         *(type->typ_dt_ops) = *dt_ops;
192         /* md_ops is optional */
193         if (md_ops)
194                 *(type->typ_md_ops) = *md_ops;
195         strcpy(type->typ_name, name);
196         spin_lock_init(&type->obd_type_lock);
197
198 #ifdef LPROCFS
199         if (enable_proc) {
200                 type->typ_procroot = lprocfs_seq_register(type->typ_name,
201                                                           proc_lustre_root,
202                                                           vars, type);
203                 if (IS_ERR(type->typ_procroot)) {
204                         rc = PTR_ERR(type->typ_procroot);
205                         type->typ_procroot = NULL;
206                         GOTO(failed, rc);
207                 }
208         }
209 #endif
210         if (ldt != NULL) {
211                 type->typ_lu = ldt;
212                 rc = lu_device_type_init(ldt);
213                 if (rc != 0)
214                         GOTO (failed, rc);
215         }
216
217         spin_lock(&obd_types_lock);
218         list_add(&type->typ_chain, &obd_types);
219         spin_unlock(&obd_types_lock);
220
221         RETURN (0);
222
223 failed:
224         if (type->typ_name != NULL) {
225 #ifdef LPROCFS
226                 if (type->typ_procroot != NULL) {
227 #ifndef HAVE_ONLY_PROCFS_SEQ
228                         lprocfs_try_remove_proc_entry(type->typ_name,
229                                                       proc_lustre_root);
230 #else
231                         remove_proc_subtree(type->typ_name, proc_lustre_root);
232 #endif
233                 }
234 #endif
235                 OBD_FREE(type->typ_name, strlen(name) + 1);
236         }
237         if (type->typ_md_ops != NULL)
238                 OBD_FREE_PTR(type->typ_md_ops);
239         if (type->typ_dt_ops != NULL)
240                 OBD_FREE_PTR(type->typ_dt_ops);
241         OBD_FREE(type, sizeof(*type));
242         RETURN(rc);
243 }
244 EXPORT_SYMBOL(class_register_type);
245
246 int class_unregister_type(const char *name)
247 {
248         struct obd_type *type = class_search_type(name);
249         ENTRY;
250
251         if (!type) {
252                 CERROR("unknown obd type\n");
253                 RETURN(-EINVAL);
254         }
255
256         if (type->typ_refcnt) {
257                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258                 /* This is a bad situation, let's make the best of it */
259                 /* Remove ops, but leave the name for debugging */
260                 OBD_FREE_PTR(type->typ_dt_ops);
261                 OBD_FREE_PTR(type->typ_md_ops);
262                 RETURN(-EBUSY);
263         }
264
265         /* we do not use type->typ_procroot as for compatibility purposes
266          * other modules can share names (i.e. lod can use lov entry). so
267          * we can't reference pointer as it can get invalided when another
268          * module removes the entry */
269 #ifdef LPROCFS
270         if (type->typ_procroot != NULL) {
271 #ifndef HAVE_ONLY_PROCFS_SEQ
272                 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
273 #else
274                 remove_proc_subtree(type->typ_name, proc_lustre_root);
275 #endif
276         }
277
278         if (type->typ_procsym != NULL)
279                 lprocfs_remove(&type->typ_procsym);
280 #endif
281         if (type->typ_lu)
282                 lu_device_type_fini(type->typ_lu);
283
284         spin_lock(&obd_types_lock);
285         list_del(&type->typ_chain);
286         spin_unlock(&obd_types_lock);
287         OBD_FREE(type->typ_name, strlen(name) + 1);
288         if (type->typ_dt_ops != NULL)
289                 OBD_FREE_PTR(type->typ_dt_ops);
290         if (type->typ_md_ops != NULL)
291                 OBD_FREE_PTR(type->typ_md_ops);
292         OBD_FREE(type, sizeof(*type));
293         RETURN(0);
294 } /* class_unregister_type */
295 EXPORT_SYMBOL(class_unregister_type);
296
297 /**
298  * Create a new obd device.
299  *
300  * Find an empty slot in ::obd_devs[], create a new obd device in it.
301  *
302  * \param[in] type_name obd device type string.
303  * \param[in] name      obd device name.
304  *
305  * \retval NULL if create fails, otherwise return the obd device
306  *         pointer created.
307  */
308 struct obd_device *class_newdev(const char *type_name, const char *name)
309 {
310         struct obd_device *result = NULL;
311         struct obd_device *newdev;
312         struct obd_type *type = NULL;
313         int i;
314         int new_obd_minor = 0;
315         ENTRY;
316
317         if (strlen(name) >= MAX_OBD_NAME) {
318                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
319                 RETURN(ERR_PTR(-EINVAL));
320         }
321
322         type = class_get_type(type_name);
323         if (type == NULL){
324                 CERROR("OBD: unknown type: %s\n", type_name);
325                 RETURN(ERR_PTR(-ENODEV));
326         }
327
328         newdev = obd_device_alloc();
329         if (newdev == NULL)
330                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
331
332         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
333
334         write_lock(&obd_dev_lock);
335         for (i = 0; i < class_devno_max(); i++) {
336                 struct obd_device *obd = class_num2obd(i);
337
338                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
339                         CERROR("Device %s already exists at %d, won't add\n",
340                                name, i);
341                         if (result) {
342                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
343                                          "%p obd_magic %08x != %08x\n", result,
344                                          result->obd_magic, OBD_DEVICE_MAGIC);
345                                 LASSERTF(result->obd_minor == new_obd_minor,
346                                          "%p obd_minor %d != %d\n", result,
347                                          result->obd_minor, new_obd_minor);
348
349                                 obd_devs[result->obd_minor] = NULL;
350                                 result->obd_name[0]='\0';
351                          }
352                         result = ERR_PTR(-EEXIST);
353                         break;
354                 }
355                 if (!result && !obd) {
356                         result = newdev;
357                         result->obd_minor = i;
358                         new_obd_minor = i;
359                         result->obd_type = type;
360                         strncpy(result->obd_name, name,
361                                 sizeof(result->obd_name) - 1);
362                         obd_devs[i] = result;
363                 }
364         }
365         write_unlock(&obd_dev_lock);
366
367         if (result == NULL && i >= class_devno_max()) {
368                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
369                        class_devno_max());
370                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
371         }
372
373         if (IS_ERR(result))
374                 GOTO(out, result);
375
376         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
377                result->obd_name, result);
378
379         RETURN(result);
380 out:
381         obd_device_free(newdev);
382 out_type:
383         class_put_type(type);
384         return result;
385 }
386
387 void class_release_dev(struct obd_device *obd)
388 {
389         struct obd_type *obd_type = obd->obd_type;
390
391         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
392                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
393         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
394                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
395         LASSERT(obd_type != NULL);
396
397         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
398                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
399
400         write_lock(&obd_dev_lock);
401         obd_devs[obd->obd_minor] = NULL;
402         write_unlock(&obd_dev_lock);
403         obd_device_free(obd);
404
405         class_put_type(obd_type);
406 }
407
408 int class_name2dev(const char *name)
409 {
410         int i;
411
412         if (!name)
413                 return -1;
414
415         read_lock(&obd_dev_lock);
416         for (i = 0; i < class_devno_max(); i++) {
417                 struct obd_device *obd = class_num2obd(i);
418
419                 if (obd && strcmp(name, obd->obd_name) == 0) {
420                         /* Make sure we finished attaching before we give
421                            out any references */
422                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
423                         if (obd->obd_attached) {
424                                 read_unlock(&obd_dev_lock);
425                                 return i;
426                         }
427                         break;
428                 }
429         }
430         read_unlock(&obd_dev_lock);
431
432         return -1;
433 }
434 EXPORT_SYMBOL(class_name2dev);
435
436 struct obd_device *class_name2obd(const char *name)
437 {
438         int dev = class_name2dev(name);
439
440         if (dev < 0 || dev > class_devno_max())
441                 return NULL;
442         return class_num2obd(dev);
443 }
444 EXPORT_SYMBOL(class_name2obd);
445
446 int class_uuid2dev(struct obd_uuid *uuid)
447 {
448         int i;
449
450         read_lock(&obd_dev_lock);
451         for (i = 0; i < class_devno_max(); i++) {
452                 struct obd_device *obd = class_num2obd(i);
453
454                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
455                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
456                         read_unlock(&obd_dev_lock);
457                         return i;
458                 }
459         }
460         read_unlock(&obd_dev_lock);
461
462         return -1;
463 }
464 EXPORT_SYMBOL(class_uuid2dev);
465
466 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
467 {
468         int dev = class_uuid2dev(uuid);
469         if (dev < 0)
470                 return NULL;
471         return class_num2obd(dev);
472 }
473 EXPORT_SYMBOL(class_uuid2obd);
474
475 /**
476  * Get obd device from ::obd_devs[]
477  *
478  * \param num [in] array index
479  *
480  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
481  *         otherwise return the obd device there.
482  */
483 struct obd_device *class_num2obd(int num)
484 {
485         struct obd_device *obd = NULL;
486
487         if (num < class_devno_max()) {
488                 obd = obd_devs[num];
489                 if (obd == NULL)
490                         return NULL;
491
492                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
493                          "%p obd_magic %08x != %08x\n",
494                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
495                 LASSERTF(obd->obd_minor == num,
496                          "%p obd_minor %0d != %0d\n",
497                          obd, obd->obd_minor, num);
498         }
499
500         return obd;
501 }
502 EXPORT_SYMBOL(class_num2obd);
503
504 /**
505  * Get obd devices count. Device in any
506  *    state are counted
507  * \retval obd device count
508  */
509 int get_devices_count(void)
510 {
511         int index, max_index = class_devno_max(), dev_count = 0;
512
513         read_lock(&obd_dev_lock);
514         for (index = 0; index <= max_index; index++) {
515                 struct obd_device *obd = class_num2obd(index);
516                 if (obd != NULL)
517                         dev_count++;
518         }
519         read_unlock(&obd_dev_lock);
520
521         return dev_count;
522 }
523 EXPORT_SYMBOL(get_devices_count);
524
525 void class_obd_list(void)
526 {
527         char *status;
528         int i;
529
530         read_lock(&obd_dev_lock);
531         for (i = 0; i < class_devno_max(); i++) {
532                 struct obd_device *obd = class_num2obd(i);
533
534                 if (obd == NULL)
535                         continue;
536                 if (obd->obd_stopping)
537                         status = "ST";
538                 else if (obd->obd_set_up)
539                         status = "UP";
540                 else if (obd->obd_attached)
541                         status = "AT";
542                 else
543                         status = "--";
544                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
545                          i, status, obd->obd_type->typ_name,
546                          obd->obd_name, obd->obd_uuid.uuid,
547                          atomic_read(&obd->obd_refcount));
548         }
549         read_unlock(&obd_dev_lock);
550         return;
551 }
552
553 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
554    specified, then only the client with that uuid is returned,
555    otherwise any client connected to the tgt is returned. */
556 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
557                                           const char * typ_name,
558                                           struct obd_uuid *grp_uuid)
559 {
560         int i;
561
562         read_lock(&obd_dev_lock);
563         for (i = 0; i < class_devno_max(); i++) {
564                 struct obd_device *obd = class_num2obd(i);
565
566                 if (obd == NULL)
567                         continue;
568                 if ((strncmp(obd->obd_type->typ_name, typ_name,
569                              strlen(typ_name)) == 0)) {
570                         if (obd_uuid_equals(tgt_uuid,
571                                             &obd->u.cli.cl_target_uuid) &&
572                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
573                                                          &obd->obd_uuid) : 1)) {
574                                 read_unlock(&obd_dev_lock);
575                                 return obd;
576                         }
577                 }
578         }
579         read_unlock(&obd_dev_lock);
580
581         return NULL;
582 }
583 EXPORT_SYMBOL(class_find_client_obd);
584
585 /* Iterate the obd_device list looking devices have grp_uuid. Start
586    searching at *next, and if a device is found, the next index to look
587    at is saved in *next. If next is NULL, then the first matching device
588    will always be returned. */
589 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
590 {
591         int i;
592
593         if (next == NULL)
594                 i = 0;
595         else if (*next >= 0 && *next < class_devno_max())
596                 i = *next;
597         else
598                 return NULL;
599
600         read_lock(&obd_dev_lock);
601         for (; i < class_devno_max(); i++) {
602                 struct obd_device *obd = class_num2obd(i);
603
604                 if (obd == NULL)
605                         continue;
606                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
607                         if (next != NULL)
608                                 *next = i+1;
609                         read_unlock(&obd_dev_lock);
610                         return obd;
611                 }
612         }
613         read_unlock(&obd_dev_lock);
614
615         return NULL;
616 }
617 EXPORT_SYMBOL(class_devices_in_group);
618
619 /**
620  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
621  * adjust sptlrpc settings accordingly.
622  */
623 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
624 {
625         struct obd_device  *obd;
626         const char         *type;
627         int                 i, rc = 0, rc2;
628
629         LASSERT(namelen > 0);
630
631         read_lock(&obd_dev_lock);
632         for (i = 0; i < class_devno_max(); i++) {
633                 obd = class_num2obd(i);
634
635                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
636                         continue;
637
638                 /* only notify mdc, osc, mdt, ost */
639                 type = obd->obd_type->typ_name;
640                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
641                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
642                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
643                     strcmp(type, LUSTRE_OST_NAME) != 0)
644                         continue;
645
646                 if (strncmp(obd->obd_name, fsname, namelen))
647                         continue;
648
649                 class_incref(obd, __FUNCTION__, obd);
650                 read_unlock(&obd_dev_lock);
651                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
652                                          sizeof(KEY_SPTLRPC_CONF),
653                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
654                 rc = rc ? rc : rc2;
655                 class_decref(obd, __FUNCTION__, obd);
656                 read_lock(&obd_dev_lock);
657         }
658         read_unlock(&obd_dev_lock);
659         return rc;
660 }
661 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
662
663 void obd_cleanup_caches(void)
664 {
665         ENTRY;
666         if (obd_device_cachep) {
667                 kmem_cache_destroy(obd_device_cachep);
668                 obd_device_cachep = NULL;
669         }
670         if (obdo_cachep) {
671                 kmem_cache_destroy(obdo_cachep);
672                 obdo_cachep = NULL;
673         }
674         if (import_cachep) {
675                 kmem_cache_destroy(import_cachep);
676                 import_cachep = NULL;
677         }
678         if (capa_cachep) {
679                 kmem_cache_destroy(capa_cachep);
680                 capa_cachep = NULL;
681         }
682         EXIT;
683 }
684
685 int obd_init_caches(void)
686 {
687         int rc;
688         ENTRY;
689
690         LASSERT(obd_device_cachep == NULL);
691         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
692                                               sizeof(struct obd_device),
693                                               0, 0, NULL);
694         if (!obd_device_cachep)
695                 GOTO(out, rc = -ENOMEM);
696
697         LASSERT(obdo_cachep == NULL);
698         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
699                                         0, 0, NULL);
700         if (!obdo_cachep)
701                 GOTO(out, rc = -ENOMEM);
702
703         LASSERT(import_cachep == NULL);
704         import_cachep = kmem_cache_create("ll_import_cache",
705                                           sizeof(struct obd_import),
706                                           0, 0, NULL);
707         if (!import_cachep)
708                 GOTO(out, rc = -ENOMEM);
709
710         LASSERT(capa_cachep == NULL);
711         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
712                                         0, 0, NULL);
713         if (!capa_cachep)
714                 GOTO(out, rc = -ENOMEM);
715
716         RETURN(0);
717 out:
718         obd_cleanup_caches();
719         RETURN(rc);
720 }
721
722 /* map connection to client */
723 struct obd_export *class_conn2export(struct lustre_handle *conn)
724 {
725         struct obd_export *export;
726         ENTRY;
727
728         if (!conn) {
729                 CDEBUG(D_CACHE, "looking for null handle\n");
730                 RETURN(NULL);
731         }
732
733         if (conn->cookie == -1) {  /* this means assign a new connection */
734                 CDEBUG(D_CACHE, "want a new connection\n");
735                 RETURN(NULL);
736         }
737
738         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
739         export = class_handle2object(conn->cookie, NULL);
740         RETURN(export);
741 }
742 EXPORT_SYMBOL(class_conn2export);
743
744 struct obd_device *class_exp2obd(struct obd_export *exp)
745 {
746         if (exp)
747                 return exp->exp_obd;
748         return NULL;
749 }
750 EXPORT_SYMBOL(class_exp2obd);
751
752 struct obd_device *class_conn2obd(struct lustre_handle *conn)
753 {
754         struct obd_export *export;
755         export = class_conn2export(conn);
756         if (export) {
757                 struct obd_device *obd = export->exp_obd;
758                 class_export_put(export);
759                 return obd;
760         }
761         return NULL;
762 }
763 EXPORT_SYMBOL(class_conn2obd);
764
765 struct obd_import *class_exp2cliimp(struct obd_export *exp)
766 {
767         struct obd_device *obd = exp->exp_obd;
768         if (obd == NULL)
769                 return NULL;
770         return obd->u.cli.cl_import;
771 }
772 EXPORT_SYMBOL(class_exp2cliimp);
773
774 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
775 {
776         struct obd_device *obd = class_conn2obd(conn);
777         if (obd == NULL)
778                 return NULL;
779         return obd->u.cli.cl_import;
780 }
781 EXPORT_SYMBOL(class_conn2cliimp);
782
783 /* Export management functions */
784 static void class_export_destroy(struct obd_export *exp)
785 {
786         struct obd_device *obd = exp->exp_obd;
787         ENTRY;
788
789         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
790         LASSERT(obd != NULL);
791
792         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
793                exp->exp_client_uuid.uuid, obd->obd_name);
794
795         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
796         if (exp->exp_connection)
797                 ptlrpc_put_connection_superhack(exp->exp_connection);
798
799         LASSERT(list_empty(&exp->exp_outstanding_replies));
800         LASSERT(list_empty(&exp->exp_uncommitted_replies));
801         LASSERT(list_empty(&exp->exp_req_replay_queue));
802         LASSERT(list_empty(&exp->exp_hp_rpcs));
803         obd_destroy_export(exp);
804         class_decref(obd, "export", exp);
805
806         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
807         EXIT;
808 }
809
810 static void export_handle_addref(void *export)
811 {
812         class_export_get(export);
813 }
814
815 static struct portals_handle_ops export_handle_ops = {
816         .hop_addref = export_handle_addref,
817         .hop_free   = NULL,
818 };
819
820 struct obd_export *class_export_get(struct obd_export *exp)
821 {
822         atomic_inc(&exp->exp_refcount);
823         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
824                atomic_read(&exp->exp_refcount));
825         return exp;
826 }
827 EXPORT_SYMBOL(class_export_get);
828
829 void class_export_put(struct obd_export *exp)
830 {
831         LASSERT(exp != NULL);
832         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
833         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
834                atomic_read(&exp->exp_refcount) - 1);
835
836         if (atomic_dec_and_test(&exp->exp_refcount)) {
837                 LASSERT(!list_empty(&exp->exp_obd_chain));
838                 CDEBUG(D_IOCTL, "final put %p/%s\n",
839                        exp, exp->exp_client_uuid.uuid);
840
841                 /* release nid stat refererence */
842                 lprocfs_exp_cleanup(exp);
843
844                 obd_zombie_export_add(exp);
845         }
846 }
847 EXPORT_SYMBOL(class_export_put);
848
849 /* Creates a new export, adds it to the hash table, and returns a
850  * pointer to it. The refcount is 2: one for the hash reference, and
851  * one for the pointer returned by this function. */
852 struct obd_export *class_new_export(struct obd_device *obd,
853                                     struct obd_uuid *cluuid)
854 {
855         struct obd_export *export;
856         cfs_hash_t *hash = NULL;
857         int rc = 0;
858         ENTRY;
859
860         OBD_ALLOC_PTR(export);
861         if (!export)
862                 return ERR_PTR(-ENOMEM);
863
864         export->exp_conn_cnt = 0;
865         export->exp_lock_hash = NULL;
866         export->exp_flock_hash = NULL;
867         atomic_set(&export->exp_refcount, 2);
868         atomic_set(&export->exp_rpc_count, 0);
869         atomic_set(&export->exp_cb_count, 0);
870         atomic_set(&export->exp_locks_count, 0);
871 #if LUSTRE_TRACKS_LOCK_EXP_REFS
872         INIT_LIST_HEAD(&export->exp_locks_list);
873         spin_lock_init(&export->exp_locks_list_guard);
874 #endif
875         atomic_set(&export->exp_replay_count, 0);
876         export->exp_obd = obd;
877         INIT_LIST_HEAD(&export->exp_outstanding_replies);
878         spin_lock_init(&export->exp_uncommitted_replies_lock);
879         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
880         INIT_LIST_HEAD(&export->exp_req_replay_queue);
881         INIT_LIST_HEAD(&export->exp_handle.h_link);
882         INIT_LIST_HEAD(&export->exp_hp_rpcs);
883         INIT_LIST_HEAD(&export->exp_reg_rpcs);
884         class_handle_hash(&export->exp_handle, &export_handle_ops);
885         export->exp_last_request_time = cfs_time_current_sec();
886         spin_lock_init(&export->exp_lock);
887         spin_lock_init(&export->exp_rpc_lock);
888         INIT_HLIST_NODE(&export->exp_uuid_hash);
889         INIT_HLIST_NODE(&export->exp_nid_hash);
890         spin_lock_init(&export->exp_bl_list_lock);
891         INIT_LIST_HEAD(&export->exp_bl_list);
892
893         export->exp_sp_peer = LUSTRE_SP_ANY;
894         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
895         export->exp_client_uuid = *cluuid;
896         obd_init_export(export);
897
898         spin_lock(&obd->obd_dev_lock);
899         /* shouldn't happen, but might race */
900         if (obd->obd_stopping)
901                 GOTO(exit_unlock, rc = -ENODEV);
902
903         hash = cfs_hash_getref(obd->obd_uuid_hash);
904         if (hash == NULL)
905                 GOTO(exit_unlock, rc = -ENODEV);
906         spin_unlock(&obd->obd_dev_lock);
907
908         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
909                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
910                 if (rc != 0) {
911                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
912                                       obd->obd_name, cluuid->uuid, rc);
913                         GOTO(exit_err, rc = -EALREADY);
914                 }
915         }
916
917         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
918         spin_lock(&obd->obd_dev_lock);
919         if (obd->obd_stopping) {
920                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
921                 GOTO(exit_unlock, rc = -ENODEV);
922         }
923
924         class_incref(obd, "export", export);
925         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
926         list_add_tail(&export->exp_obd_chain_timed,
927                       &export->exp_obd->obd_exports_timed);
928         export->exp_obd->obd_num_exports++;
929         spin_unlock(&obd->obd_dev_lock);
930         cfs_hash_putref(hash);
931         RETURN(export);
932
933 exit_unlock:
934         spin_unlock(&obd->obd_dev_lock);
935 exit_err:
936         if (hash)
937                 cfs_hash_putref(hash);
938         class_handle_unhash(&export->exp_handle);
939         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
940         obd_destroy_export(export);
941         OBD_FREE_PTR(export);
942         return ERR_PTR(rc);
943 }
944 EXPORT_SYMBOL(class_new_export);
945
946 void class_unlink_export(struct obd_export *exp)
947 {
948         class_handle_unhash(&exp->exp_handle);
949
950         spin_lock(&exp->exp_obd->obd_dev_lock);
951         /* delete an uuid-export hashitem from hashtables */
952         if (!hlist_unhashed(&exp->exp_uuid_hash))
953                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
954                              &exp->exp_client_uuid,
955                              &exp->exp_uuid_hash);
956
957         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
958         list_del_init(&exp->exp_obd_chain_timed);
959         exp->exp_obd->obd_num_exports--;
960         spin_unlock(&exp->exp_obd->obd_dev_lock);
961         class_export_put(exp);
962 }
963 EXPORT_SYMBOL(class_unlink_export);
964
965 /* Import management functions */
966 static void class_import_destroy(struct obd_import *imp)
967 {
968         ENTRY;
969
970         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
971                 imp->imp_obd->obd_name);
972
973         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
974
975         ptlrpc_put_connection_superhack(imp->imp_connection);
976
977         while (!list_empty(&imp->imp_conn_list)) {
978                 struct obd_import_conn *imp_conn;
979
980                 imp_conn = list_entry(imp->imp_conn_list.next,
981                                       struct obd_import_conn, oic_item);
982                 list_del_init(&imp_conn->oic_item);
983                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
984                 OBD_FREE(imp_conn, sizeof(*imp_conn));
985         }
986
987         LASSERT(imp->imp_sec == NULL);
988         class_decref(imp->imp_obd, "import", imp);
989         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
990         EXIT;
991 }
992
993 static void import_handle_addref(void *import)
994 {
995         class_import_get(import);
996 }
997
998 static struct portals_handle_ops import_handle_ops = {
999         .hop_addref = import_handle_addref,
1000         .hop_free   = NULL,
1001 };
1002
1003 struct obd_import *class_import_get(struct obd_import *import)
1004 {
1005         atomic_inc(&import->imp_refcount);
1006         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1007                atomic_read(&import->imp_refcount),
1008                import->imp_obd->obd_name);
1009         return import;
1010 }
1011 EXPORT_SYMBOL(class_import_get);
1012
1013 void class_import_put(struct obd_import *imp)
1014 {
1015         ENTRY;
1016
1017         LASSERT(list_empty(&imp->imp_zombie_chain));
1018         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1019
1020         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1021                atomic_read(&imp->imp_refcount) - 1,
1022                imp->imp_obd->obd_name);
1023
1024         if (atomic_dec_and_test(&imp->imp_refcount)) {
1025                 CDEBUG(D_INFO, "final put import %p\n", imp);
1026                 obd_zombie_import_add(imp);
1027         }
1028
1029         /* catch possible import put race */
1030         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1031         EXIT;
1032 }
1033 EXPORT_SYMBOL(class_import_put);
1034
1035 static void init_imp_at(struct imp_at *at) {
1036         int i;
1037         at_init(&at->iat_net_latency, 0, 0);
1038         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1039                 /* max service estimates are tracked on the server side, so
1040                    don't use the AT history here, just use the last reported
1041                    val. (But keep hist for proc histogram, worst_ever) */
1042                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1043                         AT_FLG_NOHIST);
1044         }
1045 }
1046
1047 struct obd_import *class_new_import(struct obd_device *obd)
1048 {
1049         struct obd_import *imp;
1050
1051         OBD_ALLOC(imp, sizeof(*imp));
1052         if (imp == NULL)
1053                 return NULL;
1054
1055         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1056         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1057         INIT_LIST_HEAD(&imp->imp_replay_list);
1058         INIT_LIST_HEAD(&imp->imp_sending_list);
1059         INIT_LIST_HEAD(&imp->imp_delayed_list);
1060         INIT_LIST_HEAD(&imp->imp_committed_list);
1061         imp->imp_replay_cursor = &imp->imp_committed_list;
1062         spin_lock_init(&imp->imp_lock);
1063         imp->imp_last_success_conn = 0;
1064         imp->imp_state = LUSTRE_IMP_NEW;
1065         imp->imp_obd = class_incref(obd, "import", imp);
1066         mutex_init(&imp->imp_sec_mutex);
1067         init_waitqueue_head(&imp->imp_recovery_waitq);
1068
1069         atomic_set(&imp->imp_refcount, 2);
1070         atomic_set(&imp->imp_unregistering, 0);
1071         atomic_set(&imp->imp_inflight, 0);
1072         atomic_set(&imp->imp_replay_inflight, 0);
1073         atomic_set(&imp->imp_inval_count, 0);
1074         INIT_LIST_HEAD(&imp->imp_conn_list);
1075         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1076         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1077         init_imp_at(&imp->imp_at);
1078
1079         /* the default magic is V2, will be used in connect RPC, and
1080          * then adjusted according to the flags in request/reply. */
1081         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1082
1083         return imp;
1084 }
1085 EXPORT_SYMBOL(class_new_import);
1086
1087 void class_destroy_import(struct obd_import *import)
1088 {
1089         LASSERT(import != NULL);
1090         LASSERT(import != LP_POISON);
1091
1092         class_handle_unhash(&import->imp_handle);
1093
1094         spin_lock(&import->imp_lock);
1095         import->imp_generation++;
1096         spin_unlock(&import->imp_lock);
1097         class_import_put(import);
1098 }
1099 EXPORT_SYMBOL(class_destroy_import);
1100
1101 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1102
1103 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1104 {
1105         spin_lock(&exp->exp_locks_list_guard);
1106
1107         LASSERT(lock->l_exp_refs_nr >= 0);
1108
1109         if (lock->l_exp_refs_target != NULL &&
1110             lock->l_exp_refs_target != exp) {
1111                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1112                               exp, lock, lock->l_exp_refs_target);
1113         }
1114         if ((lock->l_exp_refs_nr ++) == 0) {
1115                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1116                 lock->l_exp_refs_target = exp;
1117         }
1118         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1119                lock, exp, lock->l_exp_refs_nr);
1120         spin_unlock(&exp->exp_locks_list_guard);
1121 }
1122 EXPORT_SYMBOL(__class_export_add_lock_ref);
1123
1124 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1125 {
1126         spin_lock(&exp->exp_locks_list_guard);
1127         LASSERT(lock->l_exp_refs_nr > 0);
1128         if (lock->l_exp_refs_target != exp) {
1129                 LCONSOLE_WARN("lock %p, "
1130                               "mismatching export pointers: %p, %p\n",
1131                               lock, lock->l_exp_refs_target, exp);
1132         }
1133         if (-- lock->l_exp_refs_nr == 0) {
1134                 list_del_init(&lock->l_exp_refs_link);
1135                 lock->l_exp_refs_target = NULL;
1136         }
1137         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1138                lock, exp, lock->l_exp_refs_nr);
1139         spin_unlock(&exp->exp_locks_list_guard);
1140 }
1141 EXPORT_SYMBOL(__class_export_del_lock_ref);
1142 #endif
1143
1144 /* A connection defines an export context in which preallocation can
1145    be managed. This releases the export pointer reference, and returns
1146    the export handle, so the export refcount is 1 when this function
1147    returns. */
1148 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1149                   struct obd_uuid *cluuid)
1150 {
1151         struct obd_export *export;
1152         LASSERT(conn != NULL);
1153         LASSERT(obd != NULL);
1154         LASSERT(cluuid != NULL);
1155         ENTRY;
1156
1157         export = class_new_export(obd, cluuid);
1158         if (IS_ERR(export))
1159                 RETURN(PTR_ERR(export));
1160
1161         conn->cookie = export->exp_handle.h_cookie;
1162         class_export_put(export);
1163
1164         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1165                cluuid->uuid, conn->cookie);
1166         RETURN(0);
1167 }
1168 EXPORT_SYMBOL(class_connect);
1169
1170 /* if export is involved in recovery then clean up related things */
1171 static void class_export_recovery_cleanup(struct obd_export *exp)
1172 {
1173         struct obd_device *obd = exp->exp_obd;
1174
1175         spin_lock(&obd->obd_recovery_task_lock);
1176         if (obd->obd_recovering) {
1177                 if (exp->exp_in_recovery) {
1178                         spin_lock(&exp->exp_lock);
1179                         exp->exp_in_recovery = 0;
1180                         spin_unlock(&exp->exp_lock);
1181                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1182                         atomic_dec(&obd->obd_connected_clients);
1183                 }
1184
1185                 /* if called during recovery then should update
1186                  * obd_stale_clients counter,
1187                  * lightweight exports are not counted */
1188                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1189                         exp->exp_obd->obd_stale_clients++;
1190         }
1191         spin_unlock(&obd->obd_recovery_task_lock);
1192
1193         spin_lock(&exp->exp_lock);
1194         /** Cleanup req replay fields */
1195         if (exp->exp_req_replay_needed) {
1196                 exp->exp_req_replay_needed = 0;
1197
1198                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1199                 atomic_dec(&obd->obd_req_replay_clients);
1200         }
1201
1202         /** Cleanup lock replay data */
1203         if (exp->exp_lock_replay_needed) {
1204                 exp->exp_lock_replay_needed = 0;
1205
1206                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1207                 atomic_dec(&obd->obd_lock_replay_clients);
1208         }
1209         spin_unlock(&exp->exp_lock);
1210 }
1211
1212 /* This function removes 1-3 references from the export:
1213  * 1 - for export pointer passed
1214  * and if disconnect really need
1215  * 2 - removing from hash
1216  * 3 - in client_unlink_export
1217  * The export pointer passed to this function can destroyed */
1218 int class_disconnect(struct obd_export *export)
1219 {
1220         int already_disconnected;
1221         ENTRY;
1222
1223         if (export == NULL) {
1224                 CWARN("attempting to free NULL export %p\n", export);
1225                 RETURN(-EINVAL);
1226         }
1227
1228         spin_lock(&export->exp_lock);
1229         already_disconnected = export->exp_disconnected;
1230         export->exp_disconnected = 1;
1231         spin_unlock(&export->exp_lock);
1232
1233         /* class_cleanup(), abort_recovery(), and class_fail_export()
1234          * all end up in here, and if any of them race we shouldn't
1235          * call extra class_export_puts(). */
1236         if (already_disconnected) {
1237                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1238                 GOTO(no_disconn, already_disconnected);
1239         }
1240
1241         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1242                export->exp_handle.h_cookie);
1243
1244         if (!hlist_unhashed(&export->exp_nid_hash))
1245                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1246                              &export->exp_connection->c_peer.nid,
1247                              &export->exp_nid_hash);
1248
1249         class_export_recovery_cleanup(export);
1250         class_unlink_export(export);
1251 no_disconn:
1252         class_export_put(export);
1253         RETURN(0);
1254 }
1255 EXPORT_SYMBOL(class_disconnect);
1256
1257 /* Return non-zero for a fully connected export */
1258 int class_connected_export(struct obd_export *exp)
1259 {
1260         int connected = 0;
1261
1262         if (exp) {
1263                 spin_lock(&exp->exp_lock);
1264                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1265                 spin_unlock(&exp->exp_lock);
1266         }
1267         return connected;
1268 }
1269 EXPORT_SYMBOL(class_connected_export);
1270
1271 static void class_disconnect_export_list(struct list_head *list,
1272                                          enum obd_option flags)
1273 {
1274         int rc;
1275         struct obd_export *exp;
1276         ENTRY;
1277
1278         /* It's possible that an export may disconnect itself, but
1279          * nothing else will be added to this list. */
1280         while (!list_empty(list)) {
1281                 exp = list_entry(list->next, struct obd_export,
1282                                  exp_obd_chain);
1283                 /* need for safe call CDEBUG after obd_disconnect */
1284                 class_export_get(exp);
1285
1286                 spin_lock(&exp->exp_lock);
1287                 exp->exp_flags = flags;
1288                 spin_unlock(&exp->exp_lock);
1289
1290                 if (obd_uuid_equals(&exp->exp_client_uuid,
1291                                     &exp->exp_obd->obd_uuid)) {
1292                         CDEBUG(D_HA,
1293                                "exp %p export uuid == obd uuid, don't discon\n",
1294                                exp);
1295                         /* Need to delete this now so we don't end up pointing
1296                          * to work_list later when this export is cleaned up. */
1297                         list_del_init(&exp->exp_obd_chain);
1298                         class_export_put(exp);
1299                         continue;
1300                 }
1301
1302                 class_export_get(exp);
1303                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1304                        "last request at "CFS_TIME_T"\n",
1305                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1306                        exp, exp->exp_last_request_time);
1307                 /* release one export reference anyway */
1308                 rc = obd_disconnect(exp);
1309
1310                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1311                        obd_export_nid2str(exp), exp, rc);
1312                 class_export_put(exp);
1313         }
1314         EXIT;
1315 }
1316
1317 void class_disconnect_exports(struct obd_device *obd)
1318 {
1319         struct list_head work_list;
1320         ENTRY;
1321
1322         /* Move all of the exports from obd_exports to a work list, en masse. */
1323         INIT_LIST_HEAD(&work_list);
1324         spin_lock(&obd->obd_dev_lock);
1325         list_splice_init(&obd->obd_exports, &work_list);
1326         list_splice_init(&obd->obd_delayed_exports, &work_list);
1327         spin_unlock(&obd->obd_dev_lock);
1328
1329         if (!list_empty(&work_list)) {
1330                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1331                        "disconnecting them\n", obd->obd_minor, obd);
1332                 class_disconnect_export_list(&work_list,
1333                                              exp_flags_from_obd(obd));
1334         } else
1335                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1336                        obd->obd_minor, obd);
1337         EXIT;
1338 }
1339 EXPORT_SYMBOL(class_disconnect_exports);
1340
1341 /* Remove exports that have not completed recovery.
1342  */
1343 void class_disconnect_stale_exports(struct obd_device *obd,
1344                                     int (*test_export)(struct obd_export *))
1345 {
1346         struct list_head work_list;
1347         struct obd_export *exp, *n;
1348         int evicted = 0;
1349         ENTRY;
1350
1351         INIT_LIST_HEAD(&work_list);
1352         spin_lock(&obd->obd_dev_lock);
1353         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1354                                  exp_obd_chain) {
1355                 /* don't count self-export as client */
1356                 if (obd_uuid_equals(&exp->exp_client_uuid,
1357                                     &exp->exp_obd->obd_uuid))
1358                         continue;
1359
1360                 /* don't evict clients which have no slot in last_rcvd
1361                  * (e.g. lightweight connection) */
1362                 if (exp->exp_target_data.ted_lr_idx == -1)
1363                         continue;
1364
1365                 spin_lock(&exp->exp_lock);
1366                 if (exp->exp_failed || test_export(exp)) {
1367                         spin_unlock(&exp->exp_lock);
1368                         continue;
1369                 }
1370                 exp->exp_failed = 1;
1371                 spin_unlock(&exp->exp_lock);
1372
1373                 list_move(&exp->exp_obd_chain, &work_list);
1374                 evicted++;
1375                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1376                        obd->obd_name, exp->exp_client_uuid.uuid,
1377                        exp->exp_connection == NULL ? "<unknown>" :
1378                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1379                 print_export_data(exp, "EVICTING", 0);
1380         }
1381         spin_unlock(&obd->obd_dev_lock);
1382
1383         if (evicted)
1384                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1385                               obd->obd_name, evicted);
1386
1387         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1388                                                  OBD_OPT_ABORT_RECOV);
1389         EXIT;
1390 }
1391 EXPORT_SYMBOL(class_disconnect_stale_exports);
1392
1393 void class_fail_export(struct obd_export *exp)
1394 {
1395         int rc, already_failed;
1396
1397         spin_lock(&exp->exp_lock);
1398         already_failed = exp->exp_failed;
1399         exp->exp_failed = 1;
1400         spin_unlock(&exp->exp_lock);
1401
1402         if (already_failed) {
1403                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1404                        exp, exp->exp_client_uuid.uuid);
1405                 return;
1406         }
1407
1408         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1409                exp, exp->exp_client_uuid.uuid);
1410
1411         if (obd_dump_on_timeout)
1412                 libcfs_debug_dumplog();
1413
1414         /* need for safe call CDEBUG after obd_disconnect */
1415         class_export_get(exp);
1416
1417         /* Most callers into obd_disconnect are removing their own reference
1418          * (request, for example) in addition to the one from the hash table.
1419          * We don't have such a reference here, so make one. */
1420         class_export_get(exp);
1421         rc = obd_disconnect(exp);
1422         if (rc)
1423                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1424         else
1425                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1426                        exp, exp->exp_client_uuid.uuid);
1427         class_export_put(exp);
1428 }
1429 EXPORT_SYMBOL(class_fail_export);
1430
1431 char *obd_export_nid2str(struct obd_export *exp)
1432 {
1433         if (exp->exp_connection != NULL)
1434                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1435
1436         return "(no nid)";
1437 }
1438 EXPORT_SYMBOL(obd_export_nid2str);
1439
1440 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1441 {
1442         cfs_hash_t *nid_hash;
1443         struct obd_export *doomed_exp = NULL;
1444         int exports_evicted = 0;
1445
1446         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1447
1448         spin_lock(&obd->obd_dev_lock);
1449         /* umount has run already, so evict thread should leave
1450          * its task to umount thread now */
1451         if (obd->obd_stopping) {
1452                 spin_unlock(&obd->obd_dev_lock);
1453                 return exports_evicted;
1454         }
1455         nid_hash = obd->obd_nid_hash;
1456         cfs_hash_getref(nid_hash);
1457         spin_unlock(&obd->obd_dev_lock);
1458
1459         do {
1460                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1461                 if (doomed_exp == NULL)
1462                         break;
1463
1464                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1465                          "nid %s found, wanted nid %s, requested nid %s\n",
1466                          obd_export_nid2str(doomed_exp),
1467                          libcfs_nid2str(nid_key), nid);
1468                 LASSERTF(doomed_exp != obd->obd_self_export,
1469                          "self-export is hashed by NID?\n");
1470                 exports_evicted++;
1471                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1472                               "request\n", obd->obd_name,
1473                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1474                               obd_export_nid2str(doomed_exp));
1475                 class_fail_export(doomed_exp);
1476                 class_export_put(doomed_exp);
1477         } while (1);
1478
1479         cfs_hash_putref(nid_hash);
1480
1481         if (!exports_evicted)
1482                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1483                        obd->obd_name, nid);
1484         return exports_evicted;
1485 }
1486 EXPORT_SYMBOL(obd_export_evict_by_nid);
1487
1488 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1489 {
1490         cfs_hash_t *uuid_hash;
1491         struct obd_export *doomed_exp = NULL;
1492         struct obd_uuid doomed_uuid;
1493         int exports_evicted = 0;
1494
1495         spin_lock(&obd->obd_dev_lock);
1496         if (obd->obd_stopping) {
1497                 spin_unlock(&obd->obd_dev_lock);
1498                 return exports_evicted;
1499         }
1500         uuid_hash = obd->obd_uuid_hash;
1501         cfs_hash_getref(uuid_hash);
1502         spin_unlock(&obd->obd_dev_lock);
1503
1504         obd_str2uuid(&doomed_uuid, uuid);
1505         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1506                 CERROR("%s: can't evict myself\n", obd->obd_name);
1507                 cfs_hash_putref(uuid_hash);
1508                 return exports_evicted;
1509         }
1510
1511         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1512
1513         if (doomed_exp == NULL) {
1514                 CERROR("%s: can't disconnect %s: no exports found\n",
1515                        obd->obd_name, uuid);
1516         } else {
1517                 CWARN("%s: evicting %s at adminstrative request\n",
1518                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1519                 class_fail_export(doomed_exp);
1520                 class_export_put(doomed_exp);
1521                 exports_evicted++;
1522         }
1523         cfs_hash_putref(uuid_hash);
1524
1525         return exports_evicted;
1526 }
1527 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1528
1529 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1530 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1531 EXPORT_SYMBOL(class_export_dump_hook);
1532 #endif
1533
1534 static void print_export_data(struct obd_export *exp, const char *status,
1535                               int locks)
1536 {
1537         struct ptlrpc_reply_state *rs;
1538         struct ptlrpc_reply_state *first_reply = NULL;
1539         int nreplies = 0;
1540
1541         spin_lock(&exp->exp_lock);
1542         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1543                             rs_exp_list) {
1544                 if (nreplies == 0)
1545                         first_reply = rs;
1546                 nreplies++;
1547         }
1548         spin_unlock(&exp->exp_lock);
1549
1550         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1551                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1552                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1553                atomic_read(&exp->exp_rpc_count),
1554                atomic_read(&exp->exp_cb_count),
1555                atomic_read(&exp->exp_locks_count),
1556                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1557                nreplies, first_reply, nreplies > 3 ? "..." : "",
1558                exp->exp_last_committed);
1559 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1560         if (locks && class_export_dump_hook != NULL)
1561                 class_export_dump_hook(exp);
1562 #endif
1563 }
1564
1565 void dump_exports(struct obd_device *obd, int locks)
1566 {
1567         struct obd_export *exp;
1568
1569         spin_lock(&obd->obd_dev_lock);
1570         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1571                 print_export_data(exp, "ACTIVE", locks);
1572         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1573                 print_export_data(exp, "UNLINKED", locks);
1574         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1575                 print_export_data(exp, "DELAYED", locks);
1576         spin_unlock(&obd->obd_dev_lock);
1577         spin_lock(&obd_zombie_impexp_lock);
1578         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1579                 print_export_data(exp, "ZOMBIE", locks);
1580         spin_unlock(&obd_zombie_impexp_lock);
1581 }
1582 EXPORT_SYMBOL(dump_exports);
1583
1584 void obd_exports_barrier(struct obd_device *obd)
1585 {
1586         int waited = 2;
1587         LASSERT(list_empty(&obd->obd_exports));
1588         spin_lock(&obd->obd_dev_lock);
1589         while (!list_empty(&obd->obd_unlinked_exports)) {
1590                 spin_unlock(&obd->obd_dev_lock);
1591                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1592                                                    cfs_time_seconds(waited));
1593                 if (waited > 5 && IS_PO2(waited)) {
1594                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1595                                       "more than %d seconds. "
1596                                       "The obd refcount = %d. Is it stuck?\n",
1597                                       obd->obd_name, waited,
1598                                       atomic_read(&obd->obd_refcount));
1599                         dump_exports(obd, 1);
1600                 }
1601                 waited *= 2;
1602                 spin_lock(&obd->obd_dev_lock);
1603         }
1604         spin_unlock(&obd->obd_dev_lock);
1605 }
1606 EXPORT_SYMBOL(obd_exports_barrier);
1607
1608 /* Total amount of zombies to be destroyed */
1609 static int zombies_count = 0;
1610
1611 /**
1612  * kill zombie imports and exports
1613  */
1614 void obd_zombie_impexp_cull(void)
1615 {
1616         struct obd_import *import;
1617         struct obd_export *export;
1618         ENTRY;
1619
1620         do {
1621                 spin_lock(&obd_zombie_impexp_lock);
1622
1623                 import = NULL;
1624                 if (!list_empty(&obd_zombie_imports)) {
1625                         import = list_entry(obd_zombie_imports.next,
1626                                             struct obd_import,
1627                                             imp_zombie_chain);
1628                         list_del_init(&import->imp_zombie_chain);
1629                 }
1630
1631                 export = NULL;
1632                 if (!list_empty(&obd_zombie_exports)) {
1633                         export = list_entry(obd_zombie_exports.next,
1634                                             struct obd_export,
1635                                             exp_obd_chain);
1636                         list_del_init(&export->exp_obd_chain);
1637                 }
1638
1639                 spin_unlock(&obd_zombie_impexp_lock);
1640
1641                 if (import != NULL) {
1642                         class_import_destroy(import);
1643                         spin_lock(&obd_zombie_impexp_lock);
1644                         zombies_count--;
1645                         spin_unlock(&obd_zombie_impexp_lock);
1646                 }
1647
1648                 if (export != NULL) {
1649                         class_export_destroy(export);
1650                         spin_lock(&obd_zombie_impexp_lock);
1651                         zombies_count--;
1652                         spin_unlock(&obd_zombie_impexp_lock);
1653                 }
1654
1655                 cond_resched();
1656         } while (import != NULL || export != NULL);
1657         EXIT;
1658 }
1659
1660 static struct completion        obd_zombie_start;
1661 static struct completion        obd_zombie_stop;
1662 static unsigned long            obd_zombie_flags;
1663 static wait_queue_head_t        obd_zombie_waitq;
1664 static pid_t                    obd_zombie_pid;
1665
1666 enum {
1667         OBD_ZOMBIE_STOP         = 0x0001,
1668 };
1669
1670 /**
1671  * check for work for kill zombie import/export thread.
1672  */
1673 static int obd_zombie_impexp_check(void *arg)
1674 {
1675         int rc;
1676
1677         spin_lock(&obd_zombie_impexp_lock);
1678         rc = (zombies_count == 0) &&
1679              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1680         spin_unlock(&obd_zombie_impexp_lock);
1681
1682         RETURN(rc);
1683 }
1684
1685 /**
1686  * Add export to the obd_zombe thread and notify it.
1687  */
1688 static void obd_zombie_export_add(struct obd_export *exp) {
1689         spin_lock(&exp->exp_obd->obd_dev_lock);
1690         LASSERT(!list_empty(&exp->exp_obd_chain));
1691         list_del_init(&exp->exp_obd_chain);
1692         spin_unlock(&exp->exp_obd->obd_dev_lock);
1693         spin_lock(&obd_zombie_impexp_lock);
1694         zombies_count++;
1695         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1696         spin_unlock(&obd_zombie_impexp_lock);
1697
1698         obd_zombie_impexp_notify();
1699 }
1700
1701 /**
1702  * Add import to the obd_zombe thread and notify it.
1703  */
1704 static void obd_zombie_import_add(struct obd_import *imp) {
1705         LASSERT(imp->imp_sec == NULL);
1706         LASSERT(imp->imp_rq_pool == NULL);
1707         spin_lock(&obd_zombie_impexp_lock);
1708         LASSERT(list_empty(&imp->imp_zombie_chain));
1709         zombies_count++;
1710         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1711         spin_unlock(&obd_zombie_impexp_lock);
1712
1713         obd_zombie_impexp_notify();
1714 }
1715
1716 /**
1717  * notify import/export destroy thread about new zombie.
1718  */
1719 static void obd_zombie_impexp_notify(void)
1720 {
1721         /*
1722          * Make sure obd_zomebie_impexp_thread get this notification.
1723          * It is possible this signal only get by obd_zombie_barrier, and
1724          * barrier gulps this notification and sleeps away and hangs ensues
1725          */
1726         wake_up_all(&obd_zombie_waitq);
1727 }
1728
1729 /**
1730  * check whether obd_zombie is idle
1731  */
1732 static int obd_zombie_is_idle(void)
1733 {
1734         int rc;
1735
1736         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1737         spin_lock(&obd_zombie_impexp_lock);
1738         rc = (zombies_count == 0);
1739         spin_unlock(&obd_zombie_impexp_lock);
1740         return rc;
1741 }
1742
1743 /**
1744  * wait when obd_zombie import/export queues become empty
1745  */
1746 void obd_zombie_barrier(void)
1747 {
1748         struct l_wait_info lwi = { 0 };
1749
1750         if (obd_zombie_pid == current_pid())
1751                 /* don't wait for myself */
1752                 return;
1753         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1754 }
1755 EXPORT_SYMBOL(obd_zombie_barrier);
1756
1757
1758 /**
1759  * destroy zombie export/import thread.
1760  */
1761 static int obd_zombie_impexp_thread(void *unused)
1762 {
1763         unshare_fs_struct();
1764         complete(&obd_zombie_start);
1765
1766         obd_zombie_pid = current_pid();
1767
1768         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1769                 struct l_wait_info lwi = { 0 };
1770
1771                 l_wait_event(obd_zombie_waitq,
1772                              !obd_zombie_impexp_check(NULL), &lwi);
1773                 obd_zombie_impexp_cull();
1774
1775                 /*
1776                  * Notify obd_zombie_barrier callers that queues
1777                  * may be empty.
1778                  */
1779                 wake_up(&obd_zombie_waitq);
1780         }
1781
1782         complete(&obd_zombie_stop);
1783
1784         RETURN(0);
1785 }
1786
1787
1788 /**
1789  * start destroy zombie import/export thread
1790  */
1791 int obd_zombie_impexp_init(void)
1792 {
1793         struct task_struct *task;
1794
1795         INIT_LIST_HEAD(&obd_zombie_imports);
1796
1797         INIT_LIST_HEAD(&obd_zombie_exports);
1798         spin_lock_init(&obd_zombie_impexp_lock);
1799         init_completion(&obd_zombie_start);
1800         init_completion(&obd_zombie_stop);
1801         init_waitqueue_head(&obd_zombie_waitq);
1802         obd_zombie_pid = 0;
1803
1804         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1805         if (IS_ERR(task))
1806                 RETURN(PTR_ERR(task));
1807
1808         wait_for_completion(&obd_zombie_start);
1809         RETURN(0);
1810 }
1811 /**
1812  * stop destroy zombie import/export thread
1813  */
1814 void obd_zombie_impexp_stop(void)
1815 {
1816         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1817         obd_zombie_impexp_notify();
1818         wait_for_completion(&obd_zombie_stop);
1819 }
1820
1821 /***** Kernel-userspace comm helpers *******/
1822
1823 /* Get length of entire message, including header */
1824 int kuc_len(int payload_len)
1825 {
1826         return sizeof(struct kuc_hdr) + payload_len;
1827 }
1828 EXPORT_SYMBOL(kuc_len);
1829
1830 /* Get a pointer to kuc header, given a ptr to the payload
1831  * @param p Pointer to payload area
1832  * @returns Pointer to kuc header
1833  */
1834 struct kuc_hdr * kuc_ptr(void *p)
1835 {
1836         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1837         LASSERT(lh->kuc_magic == KUC_MAGIC);
1838         return lh;
1839 }
1840 EXPORT_SYMBOL(kuc_ptr);
1841
1842 /* Test if payload is part of kuc message
1843  * @param p Pointer to payload area
1844  * @returns boolean
1845  */
1846 int kuc_ispayload(void *p)
1847 {
1848         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1849
1850         if (kh->kuc_magic == KUC_MAGIC)
1851                 return 1;
1852         else
1853                 return 0;
1854 }
1855 EXPORT_SYMBOL(kuc_ispayload);
1856
1857 /* Alloc space for a message, and fill in header
1858  * @return Pointer to payload area
1859  */
1860 void *kuc_alloc(int payload_len, int transport, int type)
1861 {
1862         struct kuc_hdr *lh;
1863         int len = kuc_len(payload_len);
1864
1865         OBD_ALLOC(lh, len);
1866         if (lh == NULL)
1867                 return ERR_PTR(-ENOMEM);
1868
1869         lh->kuc_magic = KUC_MAGIC;
1870         lh->kuc_transport = transport;
1871         lh->kuc_msgtype = type;
1872         lh->kuc_msglen = len;
1873
1874         return (void *)(lh + 1);
1875 }
1876 EXPORT_SYMBOL(kuc_alloc);
1877
1878 /* Takes pointer to payload area */
1879 inline void kuc_free(void *p, int payload_len)
1880 {
1881         struct kuc_hdr *lh = kuc_ptr(p);
1882         OBD_FREE(lh, kuc_len(payload_len));
1883 }
1884 EXPORT_SYMBOL(kuc_free);
1885
1886 struct obd_request_slot_waiter {
1887         struct list_head        orsw_entry;
1888         wait_queue_head_t       orsw_waitq;
1889         bool                    orsw_signaled;
1890 };
1891
1892 static bool obd_request_slot_avail(struct client_obd *cli,
1893                                    struct obd_request_slot_waiter *orsw)
1894 {
1895         bool avail;
1896
1897         spin_lock(&cli->cl_loi_list_lock);
1898         avail = !!list_empty(&orsw->orsw_entry);
1899         spin_unlock(&cli->cl_loi_list_lock);
1900
1901         return avail;
1902 };
1903
1904 /*
1905  * For network flow control, the RPC sponsor needs to acquire a credit
1906  * before sending the RPC. The credits count for a connection is defined
1907  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1908  * the subsequent RPC sponsors need to wait until others released their
1909  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1910  */
1911 int obd_get_request_slot(struct client_obd *cli)
1912 {
1913         struct obd_request_slot_waiter   orsw;
1914         struct l_wait_info               lwi;
1915         int                              rc;
1916
1917         spin_lock(&cli->cl_loi_list_lock);
1918         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1919                 cli->cl_r_in_flight++;
1920                 spin_unlock(&cli->cl_loi_list_lock);
1921                 return 0;
1922         }
1923
1924         init_waitqueue_head(&orsw.orsw_waitq);
1925         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1926         orsw.orsw_signaled = false;
1927         spin_unlock(&cli->cl_loi_list_lock);
1928
1929         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1930         rc = l_wait_event(orsw.orsw_waitq,
1931                           obd_request_slot_avail(cli, &orsw) ||
1932                           orsw.orsw_signaled,
1933                           &lwi);
1934
1935         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1936          * freed but other (such as obd_put_request_slot) is using it. */
1937         spin_lock(&cli->cl_loi_list_lock);
1938         if (rc != 0) {
1939                 if (!orsw.orsw_signaled) {
1940                         if (list_empty(&orsw.orsw_entry))
1941                                 cli->cl_r_in_flight--;
1942                         else
1943                                 list_del(&orsw.orsw_entry);
1944                 }
1945         }
1946
1947         if (orsw.orsw_signaled) {
1948                 LASSERT(list_empty(&orsw.orsw_entry));
1949
1950                 rc = -EINTR;
1951         }
1952         spin_unlock(&cli->cl_loi_list_lock);
1953
1954         return rc;
1955 }
1956 EXPORT_SYMBOL(obd_get_request_slot);
1957
1958 void obd_put_request_slot(struct client_obd *cli)
1959 {
1960         struct obd_request_slot_waiter *orsw;
1961
1962         spin_lock(&cli->cl_loi_list_lock);
1963         cli->cl_r_in_flight--;
1964
1965         /* If there is free slot, wakeup the first waiter. */
1966         if (!list_empty(&cli->cl_loi_read_list) &&
1967             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1968                 orsw = list_entry(cli->cl_loi_read_list.next,
1969                                   struct obd_request_slot_waiter, orsw_entry);
1970                 list_del_init(&orsw->orsw_entry);
1971                 cli->cl_r_in_flight++;
1972                 wake_up(&orsw->orsw_waitq);
1973         }
1974         spin_unlock(&cli->cl_loi_list_lock);
1975 }
1976 EXPORT_SYMBOL(obd_put_request_slot);
1977
1978 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1979 {
1980         return cli->cl_max_rpcs_in_flight;
1981 }
1982 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1983
1984 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1985 {
1986         struct obd_request_slot_waiter *orsw;
1987         __u32                           old;
1988         int                             diff;
1989         int                             i;
1990
1991         if (max > OBD_MAX_RIF_MAX || max < 1)
1992                 return -ERANGE;
1993
1994         spin_lock(&cli->cl_loi_list_lock);
1995         old = cli->cl_max_rpcs_in_flight;
1996         cli->cl_max_rpcs_in_flight = max;
1997         diff = max - old;
1998
1999         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2000         for (i = 0; i < diff; i++) {
2001                 if (list_empty(&cli->cl_loi_read_list))
2002                         break;
2003
2004                 orsw = list_entry(cli->cl_loi_read_list.next,
2005                                   struct obd_request_slot_waiter, orsw_entry);
2006                 list_del_init(&orsw->orsw_entry);
2007                 cli->cl_r_in_flight++;
2008                 wake_up(&orsw->orsw_waitq);
2009         }
2010         spin_unlock(&cli->cl_loi_list_lock);
2011
2012         return 0;
2013 }
2014 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);