Whamcloud - gitweb
LU-5275 obdclass: Remove lprocfs_vars argument from class_register_type function
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
45
46 spinlock_t obd_types_lock;
47
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 struct kmem_cache *import_cachep;
52
53 struct list_head obd_zombie_imports;
54 struct list_head obd_zombie_exports;
55 spinlock_t  obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks);
61
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
64
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
69 static struct obd_device *obd_device_alloc(void)
70 {
71         struct obd_device *obd;
72
73         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74         if (obd != NULL) {
75                 obd->obd_magic = OBD_DEVICE_MAGIC;
76         }
77         return obd;
78 }
79
80 static void obd_device_free(struct obd_device *obd)
81 {
82         LASSERT(obd != NULL);
83         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85         if (obd->obd_namespace != NULL) {
86                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87                        obd, obd->obd_namespace, obd->obd_force);
88                 LBUG();
89         }
90         lu_ref_fini(&obd->obd_reference);
91         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93
94 struct obd_type *class_search_type(const char *name)
95 {
96         struct list_head *tmp;
97         struct obd_type *type;
98
99         spin_lock(&obd_types_lock);
100         list_for_each(tmp, &obd_types) {
101                 type = list_entry(tmp, struct obd_type, typ_chain);
102                 if (strcmp(type->typ_name, name) == 0) {
103                         spin_unlock(&obd_types_lock);
104                         return type;
105                 }
106         }
107         spin_unlock(&obd_types_lock);
108         return NULL;
109 }
110 EXPORT_SYMBOL(class_search_type);
111
112 struct obd_type *class_get_type(const char *name)
113 {
114         struct obd_type *type = class_search_type(name);
115
116 #ifdef HAVE_MODULE_LOADING_SUPPORT
117         if (!type) {
118                 const char *modname = name;
119
120                 if (strcmp(modname, "obdfilter") == 0)
121                         modname = "ofd";
122
123                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124                         modname = LUSTRE_OSP_NAME;
125
126                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127                         modname = LUSTRE_MDT_NAME;
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143         }
144         return type;
145 }
146 EXPORT_SYMBOL(class_get_type);
147
148 void class_put_type(struct obd_type *type)
149 {
150         LASSERT(type);
151         spin_lock(&type->obd_type_lock);
152         type->typ_refcnt--;
153         module_put(type->typ_dt_ops->o_owner);
154         spin_unlock(&type->obd_type_lock);
155 }
156 EXPORT_SYMBOL(class_put_type);
157
158 #define CLASS_MAX_NAME 1024
159
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161                         bool enable_proc, struct lprocfs_seq_vars *vars,
162                         const char *name, struct lu_device_type *ldt)
163 {
164         struct obd_type *type;
165         int rc = 0;
166         ENTRY;
167
168         /* sanity check */
169         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170
171         if (class_search_type(name)) {
172                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173                 RETURN(-EEXIST);
174         }
175
176         rc = -ENOMEM;
177         OBD_ALLOC(type, sizeof(*type));
178         if (type == NULL)
179                 RETURN(rc);
180
181         OBD_ALLOC_PTR(type->typ_dt_ops);
182         OBD_ALLOC_PTR(type->typ_md_ops);
183         OBD_ALLOC(type->typ_name, strlen(name) + 1);
184
185         if (type->typ_dt_ops == NULL ||
186             type->typ_md_ops == NULL ||
187             type->typ_name == NULL)
188                 GOTO (failed, rc);
189
190         *(type->typ_dt_ops) = *dt_ops;
191         /* md_ops is optional */
192         if (md_ops)
193                 *(type->typ_md_ops) = *md_ops;
194         strcpy(type->typ_name, name);
195         spin_lock_init(&type->obd_type_lock);
196
197 #ifdef LPROCFS
198         if (enable_proc) {
199                 type->typ_procroot = lprocfs_seq_register(type->typ_name,
200                                                           proc_lustre_root,
201                                                           vars, type);
202                 if (IS_ERR(type->typ_procroot)) {
203                         rc = PTR_ERR(type->typ_procroot);
204                         type->typ_procroot = NULL;
205                         GOTO(failed, rc);
206                 }
207         }
208 #endif
209         if (ldt != NULL) {
210                 type->typ_lu = ldt;
211                 rc = lu_device_type_init(ldt);
212                 if (rc != 0)
213                         GOTO (failed, rc);
214         }
215
216         spin_lock(&obd_types_lock);
217         list_add(&type->typ_chain, &obd_types);
218         spin_unlock(&obd_types_lock);
219
220         RETURN (0);
221
222 failed:
223         if (type->typ_name != NULL) {
224 #ifdef LPROCFS
225                 if (type->typ_procroot != NULL) {
226 #ifndef HAVE_ONLY_PROCFS_SEQ
227                         lprocfs_try_remove_proc_entry(type->typ_name,
228                                                       proc_lustre_root);
229 #else
230                         remove_proc_subtree(type->typ_name, proc_lustre_root);
231 #endif
232                 }
233 #endif
234                 OBD_FREE(type->typ_name, strlen(name) + 1);
235         }
236         if (type->typ_md_ops != NULL)
237                 OBD_FREE_PTR(type->typ_md_ops);
238         if (type->typ_dt_ops != NULL)
239                 OBD_FREE_PTR(type->typ_dt_ops);
240         OBD_FREE(type, sizeof(*type));
241         RETURN(rc);
242 }
243 EXPORT_SYMBOL(class_register_type);
244
245 int class_unregister_type(const char *name)
246 {
247         struct obd_type *type = class_search_type(name);
248         ENTRY;
249
250         if (!type) {
251                 CERROR("unknown obd type\n");
252                 RETURN(-EINVAL);
253         }
254
255         if (type->typ_refcnt) {
256                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
257                 /* This is a bad situation, let's make the best of it */
258                 /* Remove ops, but leave the name for debugging */
259                 OBD_FREE_PTR(type->typ_dt_ops);
260                 OBD_FREE_PTR(type->typ_md_ops);
261                 RETURN(-EBUSY);
262         }
263
264         /* we do not use type->typ_procroot as for compatibility purposes
265          * other modules can share names (i.e. lod can use lov entry). so
266          * we can't reference pointer as it can get invalided when another
267          * module removes the entry */
268 #ifdef LPROCFS
269         if (type->typ_procroot != NULL) {
270 #ifndef HAVE_ONLY_PROCFS_SEQ
271                 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
272 #else
273                 remove_proc_subtree(type->typ_name, proc_lustre_root);
274 #endif
275         }
276
277         if (type->typ_procsym != NULL)
278                 lprocfs_remove(&type->typ_procsym);
279 #endif
280         if (type->typ_lu)
281                 lu_device_type_fini(type->typ_lu);
282
283         spin_lock(&obd_types_lock);
284         list_del(&type->typ_chain);
285         spin_unlock(&obd_types_lock);
286         OBD_FREE(type->typ_name, strlen(name) + 1);
287         if (type->typ_dt_ops != NULL)
288                 OBD_FREE_PTR(type->typ_dt_ops);
289         if (type->typ_md_ops != NULL)
290                 OBD_FREE_PTR(type->typ_md_ops);
291         OBD_FREE(type, sizeof(*type));
292         RETURN(0);
293 } /* class_unregister_type */
294 EXPORT_SYMBOL(class_unregister_type);
295
296 /**
297  * Create a new obd device.
298  *
299  * Find an empty slot in ::obd_devs[], create a new obd device in it.
300  *
301  * \param[in] type_name obd device type string.
302  * \param[in] name      obd device name.
303  *
304  * \retval NULL if create fails, otherwise return the obd device
305  *         pointer created.
306  */
307 struct obd_device *class_newdev(const char *type_name, const char *name)
308 {
309         struct obd_device *result = NULL;
310         struct obd_device *newdev;
311         struct obd_type *type = NULL;
312         int i;
313         int new_obd_minor = 0;
314         ENTRY;
315
316         if (strlen(name) >= MAX_OBD_NAME) {
317                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
318                 RETURN(ERR_PTR(-EINVAL));
319         }
320
321         type = class_get_type(type_name);
322         if (type == NULL){
323                 CERROR("OBD: unknown type: %s\n", type_name);
324                 RETURN(ERR_PTR(-ENODEV));
325         }
326
327         newdev = obd_device_alloc();
328         if (newdev == NULL)
329                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
330
331         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
332
333         write_lock(&obd_dev_lock);
334         for (i = 0; i < class_devno_max(); i++) {
335                 struct obd_device *obd = class_num2obd(i);
336
337                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
338                         CERROR("Device %s already exists at %d, won't add\n",
339                                name, i);
340                         if (result) {
341                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
342                                          "%p obd_magic %08x != %08x\n", result,
343                                          result->obd_magic, OBD_DEVICE_MAGIC);
344                                 LASSERTF(result->obd_minor == new_obd_minor,
345                                          "%p obd_minor %d != %d\n", result,
346                                          result->obd_minor, new_obd_minor);
347
348                                 obd_devs[result->obd_minor] = NULL;
349                                 result->obd_name[0]='\0';
350                          }
351                         result = ERR_PTR(-EEXIST);
352                         break;
353                 }
354                 if (!result && !obd) {
355                         result = newdev;
356                         result->obd_minor = i;
357                         new_obd_minor = i;
358                         result->obd_type = type;
359                         strncpy(result->obd_name, name,
360                                 sizeof(result->obd_name) - 1);
361                         obd_devs[i] = result;
362                 }
363         }
364         write_unlock(&obd_dev_lock);
365
366         if (result == NULL && i >= class_devno_max()) {
367                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
368                        class_devno_max());
369                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
370         }
371
372         if (IS_ERR(result))
373                 GOTO(out, result);
374
375         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
376                result->obd_name, result);
377
378         RETURN(result);
379 out:
380         obd_device_free(newdev);
381 out_type:
382         class_put_type(type);
383         return result;
384 }
385
386 void class_release_dev(struct obd_device *obd)
387 {
388         struct obd_type *obd_type = obd->obd_type;
389
390         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
391                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
392         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
393                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
394         LASSERT(obd_type != NULL);
395
396         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
397                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
398
399         write_lock(&obd_dev_lock);
400         obd_devs[obd->obd_minor] = NULL;
401         write_unlock(&obd_dev_lock);
402         obd_device_free(obd);
403
404         class_put_type(obd_type);
405 }
406
407 int class_name2dev(const char *name)
408 {
409         int i;
410
411         if (!name)
412                 return -1;
413
414         read_lock(&obd_dev_lock);
415         for (i = 0; i < class_devno_max(); i++) {
416                 struct obd_device *obd = class_num2obd(i);
417
418                 if (obd && strcmp(name, obd->obd_name) == 0) {
419                         /* Make sure we finished attaching before we give
420                            out any references */
421                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422                         if (obd->obd_attached) {
423                                 read_unlock(&obd_dev_lock);
424                                 return i;
425                         }
426                         break;
427                 }
428         }
429         read_unlock(&obd_dev_lock);
430
431         return -1;
432 }
433 EXPORT_SYMBOL(class_name2dev);
434
435 struct obd_device *class_name2obd(const char *name)
436 {
437         int dev = class_name2dev(name);
438
439         if (dev < 0 || dev > class_devno_max())
440                 return NULL;
441         return class_num2obd(dev);
442 }
443 EXPORT_SYMBOL(class_name2obd);
444
445 int class_uuid2dev(struct obd_uuid *uuid)
446 {
447         int i;
448
449         read_lock(&obd_dev_lock);
450         for (i = 0; i < class_devno_max(); i++) {
451                 struct obd_device *obd = class_num2obd(i);
452
453                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
454                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
455                         read_unlock(&obd_dev_lock);
456                         return i;
457                 }
458         }
459         read_unlock(&obd_dev_lock);
460
461         return -1;
462 }
463 EXPORT_SYMBOL(class_uuid2dev);
464
465 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
466 {
467         int dev = class_uuid2dev(uuid);
468         if (dev < 0)
469                 return NULL;
470         return class_num2obd(dev);
471 }
472 EXPORT_SYMBOL(class_uuid2obd);
473
474 /**
475  * Get obd device from ::obd_devs[]
476  *
477  * \param num [in] array index
478  *
479  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
480  *         otherwise return the obd device there.
481  */
482 struct obd_device *class_num2obd(int num)
483 {
484         struct obd_device *obd = NULL;
485
486         if (num < class_devno_max()) {
487                 obd = obd_devs[num];
488                 if (obd == NULL)
489                         return NULL;
490
491                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
492                          "%p obd_magic %08x != %08x\n",
493                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
494                 LASSERTF(obd->obd_minor == num,
495                          "%p obd_minor %0d != %0d\n",
496                          obd, obd->obd_minor, num);
497         }
498
499         return obd;
500 }
501 EXPORT_SYMBOL(class_num2obd);
502
503 /**
504  * Get obd devices count. Device in any
505  *    state are counted
506  * \retval obd device count
507  */
508 int get_devices_count(void)
509 {
510         int index, max_index = class_devno_max(), dev_count = 0;
511
512         read_lock(&obd_dev_lock);
513         for (index = 0; index <= max_index; index++) {
514                 struct obd_device *obd = class_num2obd(index);
515                 if (obd != NULL)
516                         dev_count++;
517         }
518         read_unlock(&obd_dev_lock);
519
520         return dev_count;
521 }
522 EXPORT_SYMBOL(get_devices_count);
523
524 void class_obd_list(void)
525 {
526         char *status;
527         int i;
528
529         read_lock(&obd_dev_lock);
530         for (i = 0; i < class_devno_max(); i++) {
531                 struct obd_device *obd = class_num2obd(i);
532
533                 if (obd == NULL)
534                         continue;
535                 if (obd->obd_stopping)
536                         status = "ST";
537                 else if (obd->obd_set_up)
538                         status = "UP";
539                 else if (obd->obd_attached)
540                         status = "AT";
541                 else
542                         status = "--";
543                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
544                          i, status, obd->obd_type->typ_name,
545                          obd->obd_name, obd->obd_uuid.uuid,
546                          atomic_read(&obd->obd_refcount));
547         }
548         read_unlock(&obd_dev_lock);
549         return;
550 }
551
552 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
553    specified, then only the client with that uuid is returned,
554    otherwise any client connected to the tgt is returned. */
555 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
556                                           const char * typ_name,
557                                           struct obd_uuid *grp_uuid)
558 {
559         int i;
560
561         read_lock(&obd_dev_lock);
562         for (i = 0; i < class_devno_max(); i++) {
563                 struct obd_device *obd = class_num2obd(i);
564
565                 if (obd == NULL)
566                         continue;
567                 if ((strncmp(obd->obd_type->typ_name, typ_name,
568                              strlen(typ_name)) == 0)) {
569                         if (obd_uuid_equals(tgt_uuid,
570                                             &obd->u.cli.cl_target_uuid) &&
571                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
572                                                          &obd->obd_uuid) : 1)) {
573                                 read_unlock(&obd_dev_lock);
574                                 return obd;
575                         }
576                 }
577         }
578         read_unlock(&obd_dev_lock);
579
580         return NULL;
581 }
582 EXPORT_SYMBOL(class_find_client_obd);
583
584 /* Iterate the obd_device list looking devices have grp_uuid. Start
585    searching at *next, and if a device is found, the next index to look
586    at is saved in *next. If next is NULL, then the first matching device
587    will always be returned. */
588 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
589 {
590         int i;
591
592         if (next == NULL)
593                 i = 0;
594         else if (*next >= 0 && *next < class_devno_max())
595                 i = *next;
596         else
597                 return NULL;
598
599         read_lock(&obd_dev_lock);
600         for (; i < class_devno_max(); i++) {
601                 struct obd_device *obd = class_num2obd(i);
602
603                 if (obd == NULL)
604                         continue;
605                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
606                         if (next != NULL)
607                                 *next = i+1;
608                         read_unlock(&obd_dev_lock);
609                         return obd;
610                 }
611         }
612         read_unlock(&obd_dev_lock);
613
614         return NULL;
615 }
616 EXPORT_SYMBOL(class_devices_in_group);
617
618 /**
619  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
620  * adjust sptlrpc settings accordingly.
621  */
622 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
623 {
624         struct obd_device  *obd;
625         const char         *type;
626         int                 i, rc = 0, rc2;
627
628         LASSERT(namelen > 0);
629
630         read_lock(&obd_dev_lock);
631         for (i = 0; i < class_devno_max(); i++) {
632                 obd = class_num2obd(i);
633
634                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
635                         continue;
636
637                 /* only notify mdc, osc, mdt, ost */
638                 type = obd->obd_type->typ_name;
639                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
640                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
641                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
642                     strcmp(type, LUSTRE_OST_NAME) != 0)
643                         continue;
644
645                 if (strncmp(obd->obd_name, fsname, namelen))
646                         continue;
647
648                 class_incref(obd, __FUNCTION__, obd);
649                 read_unlock(&obd_dev_lock);
650                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
651                                          sizeof(KEY_SPTLRPC_CONF),
652                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
653                 rc = rc ? rc : rc2;
654                 class_decref(obd, __FUNCTION__, obd);
655                 read_lock(&obd_dev_lock);
656         }
657         read_unlock(&obd_dev_lock);
658         return rc;
659 }
660 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
661
662 void obd_cleanup_caches(void)
663 {
664         ENTRY;
665         if (obd_device_cachep) {
666                 kmem_cache_destroy(obd_device_cachep);
667                 obd_device_cachep = NULL;
668         }
669         if (obdo_cachep) {
670                 kmem_cache_destroy(obdo_cachep);
671                 obdo_cachep = NULL;
672         }
673         if (import_cachep) {
674                 kmem_cache_destroy(import_cachep);
675                 import_cachep = NULL;
676         }
677         if (capa_cachep) {
678                 kmem_cache_destroy(capa_cachep);
679                 capa_cachep = NULL;
680         }
681         EXIT;
682 }
683
684 int obd_init_caches(void)
685 {
686         int rc;
687         ENTRY;
688
689         LASSERT(obd_device_cachep == NULL);
690         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
691                                               sizeof(struct obd_device),
692                                               0, 0, NULL);
693         if (!obd_device_cachep)
694                 GOTO(out, rc = -ENOMEM);
695
696         LASSERT(obdo_cachep == NULL);
697         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
698                                         0, 0, NULL);
699         if (!obdo_cachep)
700                 GOTO(out, rc = -ENOMEM);
701
702         LASSERT(import_cachep == NULL);
703         import_cachep = kmem_cache_create("ll_import_cache",
704                                           sizeof(struct obd_import),
705                                           0, 0, NULL);
706         if (!import_cachep)
707                 GOTO(out, rc = -ENOMEM);
708
709         LASSERT(capa_cachep == NULL);
710         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
711                                         0, 0, NULL);
712         if (!capa_cachep)
713                 GOTO(out, rc = -ENOMEM);
714
715         RETURN(0);
716 out:
717         obd_cleanup_caches();
718         RETURN(rc);
719 }
720
721 /* map connection to client */
722 struct obd_export *class_conn2export(struct lustre_handle *conn)
723 {
724         struct obd_export *export;
725         ENTRY;
726
727         if (!conn) {
728                 CDEBUG(D_CACHE, "looking for null handle\n");
729                 RETURN(NULL);
730         }
731
732         if (conn->cookie == -1) {  /* this means assign a new connection */
733                 CDEBUG(D_CACHE, "want a new connection\n");
734                 RETURN(NULL);
735         }
736
737         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
738         export = class_handle2object(conn->cookie, NULL);
739         RETURN(export);
740 }
741 EXPORT_SYMBOL(class_conn2export);
742
743 struct obd_device *class_exp2obd(struct obd_export *exp)
744 {
745         if (exp)
746                 return exp->exp_obd;
747         return NULL;
748 }
749 EXPORT_SYMBOL(class_exp2obd);
750
751 struct obd_device *class_conn2obd(struct lustre_handle *conn)
752 {
753         struct obd_export *export;
754         export = class_conn2export(conn);
755         if (export) {
756                 struct obd_device *obd = export->exp_obd;
757                 class_export_put(export);
758                 return obd;
759         }
760         return NULL;
761 }
762 EXPORT_SYMBOL(class_conn2obd);
763
764 struct obd_import *class_exp2cliimp(struct obd_export *exp)
765 {
766         struct obd_device *obd = exp->exp_obd;
767         if (obd == NULL)
768                 return NULL;
769         return obd->u.cli.cl_import;
770 }
771 EXPORT_SYMBOL(class_exp2cliimp);
772
773 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
774 {
775         struct obd_device *obd = class_conn2obd(conn);
776         if (obd == NULL)
777                 return NULL;
778         return obd->u.cli.cl_import;
779 }
780 EXPORT_SYMBOL(class_conn2cliimp);
781
782 /* Export management functions */
783 static void class_export_destroy(struct obd_export *exp)
784 {
785         struct obd_device *obd = exp->exp_obd;
786         ENTRY;
787
788         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
789         LASSERT(obd != NULL);
790
791         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
792                exp->exp_client_uuid.uuid, obd->obd_name);
793
794         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
795         if (exp->exp_connection)
796                 ptlrpc_put_connection_superhack(exp->exp_connection);
797
798         LASSERT(list_empty(&exp->exp_outstanding_replies));
799         LASSERT(list_empty(&exp->exp_uncommitted_replies));
800         LASSERT(list_empty(&exp->exp_req_replay_queue));
801         LASSERT(list_empty(&exp->exp_hp_rpcs));
802         obd_destroy_export(exp);
803         class_decref(obd, "export", exp);
804
805         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
806         EXIT;
807 }
808
809 static void export_handle_addref(void *export)
810 {
811         class_export_get(export);
812 }
813
814 static struct portals_handle_ops export_handle_ops = {
815         .hop_addref = export_handle_addref,
816         .hop_free   = NULL,
817 };
818
819 struct obd_export *class_export_get(struct obd_export *exp)
820 {
821         atomic_inc(&exp->exp_refcount);
822         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
823                atomic_read(&exp->exp_refcount));
824         return exp;
825 }
826 EXPORT_SYMBOL(class_export_get);
827
828 void class_export_put(struct obd_export *exp)
829 {
830         LASSERT(exp != NULL);
831         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
832         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
833                atomic_read(&exp->exp_refcount) - 1);
834
835         if (atomic_dec_and_test(&exp->exp_refcount)) {
836                 LASSERT(!list_empty(&exp->exp_obd_chain));
837                 CDEBUG(D_IOCTL, "final put %p/%s\n",
838                        exp, exp->exp_client_uuid.uuid);
839
840                 /* release nid stat refererence */
841                 lprocfs_exp_cleanup(exp);
842
843                 obd_zombie_export_add(exp);
844         }
845 }
846 EXPORT_SYMBOL(class_export_put);
847
848 /* Creates a new export, adds it to the hash table, and returns a
849  * pointer to it. The refcount is 2: one for the hash reference, and
850  * one for the pointer returned by this function. */
851 struct obd_export *class_new_export(struct obd_device *obd,
852                                     struct obd_uuid *cluuid)
853 {
854         struct obd_export *export;
855         cfs_hash_t *hash = NULL;
856         int rc = 0;
857         ENTRY;
858
859         OBD_ALLOC_PTR(export);
860         if (!export)
861                 return ERR_PTR(-ENOMEM);
862
863         export->exp_conn_cnt = 0;
864         export->exp_lock_hash = NULL;
865         export->exp_flock_hash = NULL;
866         atomic_set(&export->exp_refcount, 2);
867         atomic_set(&export->exp_rpc_count, 0);
868         atomic_set(&export->exp_cb_count, 0);
869         atomic_set(&export->exp_locks_count, 0);
870 #if LUSTRE_TRACKS_LOCK_EXP_REFS
871         INIT_LIST_HEAD(&export->exp_locks_list);
872         spin_lock_init(&export->exp_locks_list_guard);
873 #endif
874         atomic_set(&export->exp_replay_count, 0);
875         export->exp_obd = obd;
876         INIT_LIST_HEAD(&export->exp_outstanding_replies);
877         spin_lock_init(&export->exp_uncommitted_replies_lock);
878         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
879         INIT_LIST_HEAD(&export->exp_req_replay_queue);
880         INIT_LIST_HEAD(&export->exp_handle.h_link);
881         INIT_LIST_HEAD(&export->exp_hp_rpcs);
882         INIT_LIST_HEAD(&export->exp_reg_rpcs);
883         class_handle_hash(&export->exp_handle, &export_handle_ops);
884         export->exp_last_request_time = cfs_time_current_sec();
885         spin_lock_init(&export->exp_lock);
886         spin_lock_init(&export->exp_rpc_lock);
887         INIT_HLIST_NODE(&export->exp_uuid_hash);
888         INIT_HLIST_NODE(&export->exp_nid_hash);
889         spin_lock_init(&export->exp_bl_list_lock);
890         INIT_LIST_HEAD(&export->exp_bl_list);
891
892         export->exp_sp_peer = LUSTRE_SP_ANY;
893         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
894         export->exp_client_uuid = *cluuid;
895         obd_init_export(export);
896
897         spin_lock(&obd->obd_dev_lock);
898         /* shouldn't happen, but might race */
899         if (obd->obd_stopping)
900                 GOTO(exit_unlock, rc = -ENODEV);
901
902         hash = cfs_hash_getref(obd->obd_uuid_hash);
903         if (hash == NULL)
904                 GOTO(exit_unlock, rc = -ENODEV);
905         spin_unlock(&obd->obd_dev_lock);
906
907         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
908                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
909                 if (rc != 0) {
910                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
911                                       obd->obd_name, cluuid->uuid, rc);
912                         GOTO(exit_err, rc = -EALREADY);
913                 }
914         }
915
916         spin_lock(&obd->obd_dev_lock);
917         if (obd->obd_stopping) {
918                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
919                 GOTO(exit_unlock, rc = -ENODEV);
920         }
921
922         class_incref(obd, "export", export);
923         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
924         list_add_tail(&export->exp_obd_chain_timed,
925                       &export->exp_obd->obd_exports_timed);
926         export->exp_obd->obd_num_exports++;
927         spin_unlock(&obd->obd_dev_lock);
928         cfs_hash_putref(hash);
929         RETURN(export);
930
931 exit_unlock:
932         spin_unlock(&obd->obd_dev_lock);
933 exit_err:
934         if (hash)
935                 cfs_hash_putref(hash);
936         class_handle_unhash(&export->exp_handle);
937         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
938         obd_destroy_export(export);
939         OBD_FREE_PTR(export);
940         return ERR_PTR(rc);
941 }
942 EXPORT_SYMBOL(class_new_export);
943
944 void class_unlink_export(struct obd_export *exp)
945 {
946         class_handle_unhash(&exp->exp_handle);
947
948         spin_lock(&exp->exp_obd->obd_dev_lock);
949         /* delete an uuid-export hashitem from hashtables */
950         if (!hlist_unhashed(&exp->exp_uuid_hash))
951                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
952                              &exp->exp_client_uuid,
953                              &exp->exp_uuid_hash);
954
955         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
956         list_del_init(&exp->exp_obd_chain_timed);
957         exp->exp_obd->obd_num_exports--;
958         spin_unlock(&exp->exp_obd->obd_dev_lock);
959         class_export_put(exp);
960 }
961 EXPORT_SYMBOL(class_unlink_export);
962
963 /* Import management functions */
964 void class_import_destroy(struct obd_import *imp)
965 {
966         ENTRY;
967
968         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
969                 imp->imp_obd->obd_name);
970
971         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
972
973         ptlrpc_put_connection_superhack(imp->imp_connection);
974
975         while (!list_empty(&imp->imp_conn_list)) {
976                 struct obd_import_conn *imp_conn;
977
978                 imp_conn = list_entry(imp->imp_conn_list.next,
979                                       struct obd_import_conn, oic_item);
980                 list_del_init(&imp_conn->oic_item);
981                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
982                 OBD_FREE(imp_conn, sizeof(*imp_conn));
983         }
984
985         LASSERT(imp->imp_sec == NULL);
986         class_decref(imp->imp_obd, "import", imp);
987         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
988         EXIT;
989 }
990
991 static void import_handle_addref(void *import)
992 {
993         class_import_get(import);
994 }
995
996 static struct portals_handle_ops import_handle_ops = {
997         .hop_addref = import_handle_addref,
998         .hop_free   = NULL,
999 };
1000
1001 struct obd_import *class_import_get(struct obd_import *import)
1002 {
1003         atomic_inc(&import->imp_refcount);
1004         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1005                atomic_read(&import->imp_refcount),
1006                import->imp_obd->obd_name);
1007         return import;
1008 }
1009 EXPORT_SYMBOL(class_import_get);
1010
1011 void class_import_put(struct obd_import *imp)
1012 {
1013         ENTRY;
1014
1015         LASSERT(list_empty(&imp->imp_zombie_chain));
1016         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1017
1018         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1019                atomic_read(&imp->imp_refcount) - 1,
1020                imp->imp_obd->obd_name);
1021
1022         if (atomic_dec_and_test(&imp->imp_refcount)) {
1023                 CDEBUG(D_INFO, "final put import %p\n", imp);
1024                 obd_zombie_import_add(imp);
1025         }
1026
1027         /* catch possible import put race */
1028         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1029         EXIT;
1030 }
1031 EXPORT_SYMBOL(class_import_put);
1032
1033 static void init_imp_at(struct imp_at *at) {
1034         int i;
1035         at_init(&at->iat_net_latency, 0, 0);
1036         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1037                 /* max service estimates are tracked on the server side, so
1038                    don't use the AT history here, just use the last reported
1039                    val. (But keep hist for proc histogram, worst_ever) */
1040                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1041                         AT_FLG_NOHIST);
1042         }
1043 }
1044
1045 struct obd_import *class_new_import(struct obd_device *obd)
1046 {
1047         struct obd_import *imp;
1048
1049         OBD_ALLOC(imp, sizeof(*imp));
1050         if (imp == NULL)
1051                 return NULL;
1052
1053         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1054         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1055         INIT_LIST_HEAD(&imp->imp_replay_list);
1056         INIT_LIST_HEAD(&imp->imp_sending_list);
1057         INIT_LIST_HEAD(&imp->imp_delayed_list);
1058         INIT_LIST_HEAD(&imp->imp_committed_list);
1059         imp->imp_replay_cursor = &imp->imp_committed_list;
1060         spin_lock_init(&imp->imp_lock);
1061         imp->imp_last_success_conn = 0;
1062         imp->imp_state = LUSTRE_IMP_NEW;
1063         imp->imp_obd = class_incref(obd, "import", imp);
1064         mutex_init(&imp->imp_sec_mutex);
1065         init_waitqueue_head(&imp->imp_recovery_waitq);
1066
1067         atomic_set(&imp->imp_refcount, 2);
1068         atomic_set(&imp->imp_unregistering, 0);
1069         atomic_set(&imp->imp_inflight, 0);
1070         atomic_set(&imp->imp_replay_inflight, 0);
1071         atomic_set(&imp->imp_inval_count, 0);
1072         INIT_LIST_HEAD(&imp->imp_conn_list);
1073         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1074         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1075         init_imp_at(&imp->imp_at);
1076
1077         /* the default magic is V2, will be used in connect RPC, and
1078          * then adjusted according to the flags in request/reply. */
1079         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1080
1081         return imp;
1082 }
1083 EXPORT_SYMBOL(class_new_import);
1084
1085 void class_destroy_import(struct obd_import *import)
1086 {
1087         LASSERT(import != NULL);
1088         LASSERT(import != LP_POISON);
1089
1090         class_handle_unhash(&import->imp_handle);
1091
1092         spin_lock(&import->imp_lock);
1093         import->imp_generation++;
1094         spin_unlock(&import->imp_lock);
1095         class_import_put(import);
1096 }
1097 EXPORT_SYMBOL(class_destroy_import);
1098
1099 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1100
1101 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1102 {
1103         spin_lock(&exp->exp_locks_list_guard);
1104
1105         LASSERT(lock->l_exp_refs_nr >= 0);
1106
1107         if (lock->l_exp_refs_target != NULL &&
1108             lock->l_exp_refs_target != exp) {
1109                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1110                               exp, lock, lock->l_exp_refs_target);
1111         }
1112         if ((lock->l_exp_refs_nr ++) == 0) {
1113                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1114                 lock->l_exp_refs_target = exp;
1115         }
1116         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117                lock, exp, lock->l_exp_refs_nr);
1118         spin_unlock(&exp->exp_locks_list_guard);
1119 }
1120 EXPORT_SYMBOL(__class_export_add_lock_ref);
1121
1122 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1123 {
1124         spin_lock(&exp->exp_locks_list_guard);
1125         LASSERT(lock->l_exp_refs_nr > 0);
1126         if (lock->l_exp_refs_target != exp) {
1127                 LCONSOLE_WARN("lock %p, "
1128                               "mismatching export pointers: %p, %p\n",
1129                               lock, lock->l_exp_refs_target, exp);
1130         }
1131         if (-- lock->l_exp_refs_nr == 0) {
1132                 list_del_init(&lock->l_exp_refs_link);
1133                 lock->l_exp_refs_target = NULL;
1134         }
1135         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1136                lock, exp, lock->l_exp_refs_nr);
1137         spin_unlock(&exp->exp_locks_list_guard);
1138 }
1139 EXPORT_SYMBOL(__class_export_del_lock_ref);
1140 #endif
1141
1142 /* A connection defines an export context in which preallocation can
1143    be managed. This releases the export pointer reference, and returns
1144    the export handle, so the export refcount is 1 when this function
1145    returns. */
1146 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1147                   struct obd_uuid *cluuid)
1148 {
1149         struct obd_export *export;
1150         LASSERT(conn != NULL);
1151         LASSERT(obd != NULL);
1152         LASSERT(cluuid != NULL);
1153         ENTRY;
1154
1155         export = class_new_export(obd, cluuid);
1156         if (IS_ERR(export))
1157                 RETURN(PTR_ERR(export));
1158
1159         conn->cookie = export->exp_handle.h_cookie;
1160         class_export_put(export);
1161
1162         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1163                cluuid->uuid, conn->cookie);
1164         RETURN(0);
1165 }
1166 EXPORT_SYMBOL(class_connect);
1167
1168 /* if export is involved in recovery then clean up related things */
1169 void class_export_recovery_cleanup(struct obd_export *exp)
1170 {
1171         struct obd_device *obd = exp->exp_obd;
1172
1173         spin_lock(&obd->obd_recovery_task_lock);
1174         if (obd->obd_recovering) {
1175                 if (exp->exp_in_recovery) {
1176                         spin_lock(&exp->exp_lock);
1177                         exp->exp_in_recovery = 0;
1178                         spin_unlock(&exp->exp_lock);
1179                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1180                         atomic_dec(&obd->obd_connected_clients);
1181                 }
1182
1183                 /* if called during recovery then should update
1184                  * obd_stale_clients counter,
1185                  * lightweight exports are not counted */
1186                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1187                         exp->exp_obd->obd_stale_clients++;
1188         }
1189         spin_unlock(&obd->obd_recovery_task_lock);
1190
1191         spin_lock(&exp->exp_lock);
1192         /** Cleanup req replay fields */
1193         if (exp->exp_req_replay_needed) {
1194                 exp->exp_req_replay_needed = 0;
1195
1196                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1197                 atomic_dec(&obd->obd_req_replay_clients);
1198         }
1199
1200         /** Cleanup lock replay data */
1201         if (exp->exp_lock_replay_needed) {
1202                 exp->exp_lock_replay_needed = 0;
1203
1204                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1205                 atomic_dec(&obd->obd_lock_replay_clients);
1206         }
1207         spin_unlock(&exp->exp_lock);
1208 }
1209
1210 /* This function removes 1-3 references from the export:
1211  * 1 - for export pointer passed
1212  * and if disconnect really need
1213  * 2 - removing from hash
1214  * 3 - in client_unlink_export
1215  * The export pointer passed to this function can destroyed */
1216 int class_disconnect(struct obd_export *export)
1217 {
1218         int already_disconnected;
1219         ENTRY;
1220
1221         if (export == NULL) {
1222                 CWARN("attempting to free NULL export %p\n", export);
1223                 RETURN(-EINVAL);
1224         }
1225
1226         spin_lock(&export->exp_lock);
1227         already_disconnected = export->exp_disconnected;
1228         export->exp_disconnected = 1;
1229         spin_unlock(&export->exp_lock);
1230
1231         /* class_cleanup(), abort_recovery(), and class_fail_export()
1232          * all end up in here, and if any of them race we shouldn't
1233          * call extra class_export_puts(). */
1234         if (already_disconnected) {
1235                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1236                 GOTO(no_disconn, already_disconnected);
1237         }
1238
1239         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1240                export->exp_handle.h_cookie);
1241
1242         if (!hlist_unhashed(&export->exp_nid_hash))
1243                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1244                              &export->exp_connection->c_peer.nid,
1245                              &export->exp_nid_hash);
1246
1247         class_export_recovery_cleanup(export);
1248         class_unlink_export(export);
1249 no_disconn:
1250         class_export_put(export);
1251         RETURN(0);
1252 }
1253 EXPORT_SYMBOL(class_disconnect);
1254
1255 /* Return non-zero for a fully connected export */
1256 int class_connected_export(struct obd_export *exp)
1257 {
1258         int connected = 0;
1259
1260         if (exp) {
1261                 spin_lock(&exp->exp_lock);
1262                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1263                 spin_unlock(&exp->exp_lock);
1264         }
1265         return connected;
1266 }
1267 EXPORT_SYMBOL(class_connected_export);
1268
1269 static void class_disconnect_export_list(struct list_head *list,
1270                                          enum obd_option flags)
1271 {
1272         int rc;
1273         struct obd_export *exp;
1274         ENTRY;
1275
1276         /* It's possible that an export may disconnect itself, but
1277          * nothing else will be added to this list. */
1278         while (!list_empty(list)) {
1279                 exp = list_entry(list->next, struct obd_export,
1280                                  exp_obd_chain);
1281                 /* need for safe call CDEBUG after obd_disconnect */
1282                 class_export_get(exp);
1283
1284                 spin_lock(&exp->exp_lock);
1285                 exp->exp_flags = flags;
1286                 spin_unlock(&exp->exp_lock);
1287
1288                 if (obd_uuid_equals(&exp->exp_client_uuid,
1289                                     &exp->exp_obd->obd_uuid)) {
1290                         CDEBUG(D_HA,
1291                                "exp %p export uuid == obd uuid, don't discon\n",
1292                                exp);
1293                         /* Need to delete this now so we don't end up pointing
1294                          * to work_list later when this export is cleaned up. */
1295                         list_del_init(&exp->exp_obd_chain);
1296                         class_export_put(exp);
1297                         continue;
1298                 }
1299
1300                 class_export_get(exp);
1301                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1302                        "last request at "CFS_TIME_T"\n",
1303                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1304                        exp, exp->exp_last_request_time);
1305                 /* release one export reference anyway */
1306                 rc = obd_disconnect(exp);
1307
1308                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1309                        obd_export_nid2str(exp), exp, rc);
1310                 class_export_put(exp);
1311         }
1312         EXIT;
1313 }
1314
1315 void class_disconnect_exports(struct obd_device *obd)
1316 {
1317         struct list_head work_list;
1318         ENTRY;
1319
1320         /* Move all of the exports from obd_exports to a work list, en masse. */
1321         INIT_LIST_HEAD(&work_list);
1322         spin_lock(&obd->obd_dev_lock);
1323         list_splice_init(&obd->obd_exports, &work_list);
1324         list_splice_init(&obd->obd_delayed_exports, &work_list);
1325         spin_unlock(&obd->obd_dev_lock);
1326
1327         if (!list_empty(&work_list)) {
1328                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1329                        "disconnecting them\n", obd->obd_minor, obd);
1330                 class_disconnect_export_list(&work_list,
1331                                              exp_flags_from_obd(obd));
1332         } else
1333                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1334                        obd->obd_minor, obd);
1335         EXIT;
1336 }
1337 EXPORT_SYMBOL(class_disconnect_exports);
1338
1339 /* Remove exports that have not completed recovery.
1340  */
1341 void class_disconnect_stale_exports(struct obd_device *obd,
1342                                     int (*test_export)(struct obd_export *))
1343 {
1344         struct list_head work_list;
1345         struct obd_export *exp, *n;
1346         int evicted = 0;
1347         ENTRY;
1348
1349         INIT_LIST_HEAD(&work_list);
1350         spin_lock(&obd->obd_dev_lock);
1351         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1352                                  exp_obd_chain) {
1353                 /* don't count self-export as client */
1354                 if (obd_uuid_equals(&exp->exp_client_uuid,
1355                                     &exp->exp_obd->obd_uuid))
1356                         continue;
1357
1358                 /* don't evict clients which have no slot in last_rcvd
1359                  * (e.g. lightweight connection) */
1360                 if (exp->exp_target_data.ted_lr_idx == -1)
1361                         continue;
1362
1363                 spin_lock(&exp->exp_lock);
1364                 if (exp->exp_failed || test_export(exp)) {
1365                         spin_unlock(&exp->exp_lock);
1366                         continue;
1367                 }
1368                 exp->exp_failed = 1;
1369                 spin_unlock(&exp->exp_lock);
1370
1371                 list_move(&exp->exp_obd_chain, &work_list);
1372                 evicted++;
1373                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1374                        obd->obd_name, exp->exp_client_uuid.uuid,
1375                        exp->exp_connection == NULL ? "<unknown>" :
1376                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1377                 print_export_data(exp, "EVICTING", 0);
1378         }
1379         spin_unlock(&obd->obd_dev_lock);
1380
1381         if (evicted)
1382                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1383                               obd->obd_name, evicted);
1384
1385         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1386                                                  OBD_OPT_ABORT_RECOV);
1387         EXIT;
1388 }
1389 EXPORT_SYMBOL(class_disconnect_stale_exports);
1390
1391 void class_fail_export(struct obd_export *exp)
1392 {
1393         int rc, already_failed;
1394
1395         spin_lock(&exp->exp_lock);
1396         already_failed = exp->exp_failed;
1397         exp->exp_failed = 1;
1398         spin_unlock(&exp->exp_lock);
1399
1400         if (already_failed) {
1401                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1402                        exp, exp->exp_client_uuid.uuid);
1403                 return;
1404         }
1405
1406         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1407                exp, exp->exp_client_uuid.uuid);
1408
1409         if (obd_dump_on_timeout)
1410                 libcfs_debug_dumplog();
1411
1412         /* need for safe call CDEBUG after obd_disconnect */
1413         class_export_get(exp);
1414
1415         /* Most callers into obd_disconnect are removing their own reference
1416          * (request, for example) in addition to the one from the hash table.
1417          * We don't have such a reference here, so make one. */
1418         class_export_get(exp);
1419         rc = obd_disconnect(exp);
1420         if (rc)
1421                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1422         else
1423                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1424                        exp, exp->exp_client_uuid.uuid);
1425         class_export_put(exp);
1426 }
1427 EXPORT_SYMBOL(class_fail_export);
1428
1429 char *obd_export_nid2str(struct obd_export *exp)
1430 {
1431         if (exp->exp_connection != NULL)
1432                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1433
1434         return "(no nid)";
1435 }
1436 EXPORT_SYMBOL(obd_export_nid2str);
1437
1438 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1439 {
1440         cfs_hash_t *nid_hash;
1441         struct obd_export *doomed_exp = NULL;
1442         int exports_evicted = 0;
1443
1444         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1445
1446         spin_lock(&obd->obd_dev_lock);
1447         /* umount has run already, so evict thread should leave
1448          * its task to umount thread now */
1449         if (obd->obd_stopping) {
1450                 spin_unlock(&obd->obd_dev_lock);
1451                 return exports_evicted;
1452         }
1453         nid_hash = obd->obd_nid_hash;
1454         cfs_hash_getref(nid_hash);
1455         spin_unlock(&obd->obd_dev_lock);
1456
1457         do {
1458                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1459                 if (doomed_exp == NULL)
1460                         break;
1461
1462                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1463                          "nid %s found, wanted nid %s, requested nid %s\n",
1464                          obd_export_nid2str(doomed_exp),
1465                          libcfs_nid2str(nid_key), nid);
1466                 LASSERTF(doomed_exp != obd->obd_self_export,
1467                          "self-export is hashed by NID?\n");
1468                 exports_evicted++;
1469                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1470                               "request\n", obd->obd_name,
1471                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1472                               obd_export_nid2str(doomed_exp));
1473                 class_fail_export(doomed_exp);
1474                 class_export_put(doomed_exp);
1475         } while (1);
1476
1477         cfs_hash_putref(nid_hash);
1478
1479         if (!exports_evicted)
1480                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1481                        obd->obd_name, nid);
1482         return exports_evicted;
1483 }
1484 EXPORT_SYMBOL(obd_export_evict_by_nid);
1485
1486 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1487 {
1488         cfs_hash_t *uuid_hash;
1489         struct obd_export *doomed_exp = NULL;
1490         struct obd_uuid doomed_uuid;
1491         int exports_evicted = 0;
1492
1493         spin_lock(&obd->obd_dev_lock);
1494         if (obd->obd_stopping) {
1495                 spin_unlock(&obd->obd_dev_lock);
1496                 return exports_evicted;
1497         }
1498         uuid_hash = obd->obd_uuid_hash;
1499         cfs_hash_getref(uuid_hash);
1500         spin_unlock(&obd->obd_dev_lock);
1501
1502         obd_str2uuid(&doomed_uuid, uuid);
1503         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1504                 CERROR("%s: can't evict myself\n", obd->obd_name);
1505                 cfs_hash_putref(uuid_hash);
1506                 return exports_evicted;
1507         }
1508
1509         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1510
1511         if (doomed_exp == NULL) {
1512                 CERROR("%s: can't disconnect %s: no exports found\n",
1513                        obd->obd_name, uuid);
1514         } else {
1515                 CWARN("%s: evicting %s at adminstrative request\n",
1516                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1517                 class_fail_export(doomed_exp);
1518                 class_export_put(doomed_exp);
1519                 exports_evicted++;
1520         }
1521         cfs_hash_putref(uuid_hash);
1522
1523         return exports_evicted;
1524 }
1525 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1526
1527 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1528 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1529 EXPORT_SYMBOL(class_export_dump_hook);
1530 #endif
1531
1532 static void print_export_data(struct obd_export *exp, const char *status,
1533                               int locks)
1534 {
1535         struct ptlrpc_reply_state *rs;
1536         struct ptlrpc_reply_state *first_reply = NULL;
1537         int nreplies = 0;
1538
1539         spin_lock(&exp->exp_lock);
1540         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1541                             rs_exp_list) {
1542                 if (nreplies == 0)
1543                         first_reply = rs;
1544                 nreplies++;
1545         }
1546         spin_unlock(&exp->exp_lock);
1547
1548         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1549                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1550                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1551                atomic_read(&exp->exp_rpc_count),
1552                atomic_read(&exp->exp_cb_count),
1553                atomic_read(&exp->exp_locks_count),
1554                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1555                nreplies, first_reply, nreplies > 3 ? "..." : "",
1556                exp->exp_last_committed);
1557 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1558         if (locks && class_export_dump_hook != NULL)
1559                 class_export_dump_hook(exp);
1560 #endif
1561 }
1562
1563 void dump_exports(struct obd_device *obd, int locks)
1564 {
1565         struct obd_export *exp;
1566
1567         spin_lock(&obd->obd_dev_lock);
1568         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1569                 print_export_data(exp, "ACTIVE", locks);
1570         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1571                 print_export_data(exp, "UNLINKED", locks);
1572         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1573                 print_export_data(exp, "DELAYED", locks);
1574         spin_unlock(&obd->obd_dev_lock);
1575         spin_lock(&obd_zombie_impexp_lock);
1576         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1577                 print_export_data(exp, "ZOMBIE", locks);
1578         spin_unlock(&obd_zombie_impexp_lock);
1579 }
1580 EXPORT_SYMBOL(dump_exports);
1581
1582 void obd_exports_barrier(struct obd_device *obd)
1583 {
1584         int waited = 2;
1585         LASSERT(list_empty(&obd->obd_exports));
1586         spin_lock(&obd->obd_dev_lock);
1587         while (!list_empty(&obd->obd_unlinked_exports)) {
1588                 spin_unlock(&obd->obd_dev_lock);
1589                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1590                                                    cfs_time_seconds(waited));
1591                 if (waited > 5 && IS_PO2(waited)) {
1592                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1593                                       "more than %d seconds. "
1594                                       "The obd refcount = %d. Is it stuck?\n",
1595                                       obd->obd_name, waited,
1596                                       atomic_read(&obd->obd_refcount));
1597                         dump_exports(obd, 1);
1598                 }
1599                 waited *= 2;
1600                 spin_lock(&obd->obd_dev_lock);
1601         }
1602         spin_unlock(&obd->obd_dev_lock);
1603 }
1604 EXPORT_SYMBOL(obd_exports_barrier);
1605
1606 /* Total amount of zombies to be destroyed */
1607 static int zombies_count = 0;
1608
1609 /**
1610  * kill zombie imports and exports
1611  */
1612 void obd_zombie_impexp_cull(void)
1613 {
1614         struct obd_import *import;
1615         struct obd_export *export;
1616         ENTRY;
1617
1618         do {
1619                 spin_lock(&obd_zombie_impexp_lock);
1620
1621                 import = NULL;
1622                 if (!list_empty(&obd_zombie_imports)) {
1623                         import = list_entry(obd_zombie_imports.next,
1624                                             struct obd_import,
1625                                             imp_zombie_chain);
1626                         list_del_init(&import->imp_zombie_chain);
1627                 }
1628
1629                 export = NULL;
1630                 if (!list_empty(&obd_zombie_exports)) {
1631                         export = list_entry(obd_zombie_exports.next,
1632                                             struct obd_export,
1633                                             exp_obd_chain);
1634                         list_del_init(&export->exp_obd_chain);
1635                 }
1636
1637                 spin_unlock(&obd_zombie_impexp_lock);
1638
1639                 if (import != NULL) {
1640                         class_import_destroy(import);
1641                         spin_lock(&obd_zombie_impexp_lock);
1642                         zombies_count--;
1643                         spin_unlock(&obd_zombie_impexp_lock);
1644                 }
1645
1646                 if (export != NULL) {
1647                         class_export_destroy(export);
1648                         spin_lock(&obd_zombie_impexp_lock);
1649                         zombies_count--;
1650                         spin_unlock(&obd_zombie_impexp_lock);
1651                 }
1652
1653                 cond_resched();
1654         } while (import != NULL || export != NULL);
1655         EXIT;
1656 }
1657
1658 static struct completion        obd_zombie_start;
1659 static struct completion        obd_zombie_stop;
1660 static unsigned long            obd_zombie_flags;
1661 static wait_queue_head_t        obd_zombie_waitq;
1662 static pid_t                    obd_zombie_pid;
1663
1664 enum {
1665         OBD_ZOMBIE_STOP         = 0x0001,
1666 };
1667
1668 /**
1669  * check for work for kill zombie import/export thread.
1670  */
1671 static int obd_zombie_impexp_check(void *arg)
1672 {
1673         int rc;
1674
1675         spin_lock(&obd_zombie_impexp_lock);
1676         rc = (zombies_count == 0) &&
1677              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1678         spin_unlock(&obd_zombie_impexp_lock);
1679
1680         RETURN(rc);
1681 }
1682
1683 /**
1684  * Add export to the obd_zombe thread and notify it.
1685  */
1686 static void obd_zombie_export_add(struct obd_export *exp) {
1687         spin_lock(&exp->exp_obd->obd_dev_lock);
1688         LASSERT(!list_empty(&exp->exp_obd_chain));
1689         list_del_init(&exp->exp_obd_chain);
1690         spin_unlock(&exp->exp_obd->obd_dev_lock);
1691         spin_lock(&obd_zombie_impexp_lock);
1692         zombies_count++;
1693         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1694         spin_unlock(&obd_zombie_impexp_lock);
1695
1696         obd_zombie_impexp_notify();
1697 }
1698
1699 /**
1700  * Add import to the obd_zombe thread and notify it.
1701  */
1702 static void obd_zombie_import_add(struct obd_import *imp) {
1703         LASSERT(imp->imp_sec == NULL);
1704         LASSERT(imp->imp_rq_pool == NULL);
1705         spin_lock(&obd_zombie_impexp_lock);
1706         LASSERT(list_empty(&imp->imp_zombie_chain));
1707         zombies_count++;
1708         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1709         spin_unlock(&obd_zombie_impexp_lock);
1710
1711         obd_zombie_impexp_notify();
1712 }
1713
1714 /**
1715  * notify import/export destroy thread about new zombie.
1716  */
1717 static void obd_zombie_impexp_notify(void)
1718 {
1719         /*
1720          * Make sure obd_zomebie_impexp_thread get this notification.
1721          * It is possible this signal only get by obd_zombie_barrier, and
1722          * barrier gulps this notification and sleeps away and hangs ensues
1723          */
1724         wake_up_all(&obd_zombie_waitq);
1725 }
1726
1727 /**
1728  * check whether obd_zombie is idle
1729  */
1730 static int obd_zombie_is_idle(void)
1731 {
1732         int rc;
1733
1734         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1735         spin_lock(&obd_zombie_impexp_lock);
1736         rc = (zombies_count == 0);
1737         spin_unlock(&obd_zombie_impexp_lock);
1738         return rc;
1739 }
1740
1741 /**
1742  * wait when obd_zombie import/export queues become empty
1743  */
1744 void obd_zombie_barrier(void)
1745 {
1746         struct l_wait_info lwi = { 0 };
1747
1748         if (obd_zombie_pid == current_pid())
1749                 /* don't wait for myself */
1750                 return;
1751         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1752 }
1753 EXPORT_SYMBOL(obd_zombie_barrier);
1754
1755
1756 /**
1757  * destroy zombie export/import thread.
1758  */
1759 static int obd_zombie_impexp_thread(void *unused)
1760 {
1761         unshare_fs_struct();
1762         complete(&obd_zombie_start);
1763
1764         obd_zombie_pid = current_pid();
1765
1766         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1767                 struct l_wait_info lwi = { 0 };
1768
1769                 l_wait_event(obd_zombie_waitq,
1770                              !obd_zombie_impexp_check(NULL), &lwi);
1771                 obd_zombie_impexp_cull();
1772
1773                 /*
1774                  * Notify obd_zombie_barrier callers that queues
1775                  * may be empty.
1776                  */
1777                 wake_up(&obd_zombie_waitq);
1778         }
1779
1780         complete(&obd_zombie_stop);
1781
1782         RETURN(0);
1783 }
1784
1785
1786 /**
1787  * start destroy zombie import/export thread
1788  */
1789 int obd_zombie_impexp_init(void)
1790 {
1791         struct task_struct *task;
1792
1793         INIT_LIST_HEAD(&obd_zombie_imports);
1794
1795         INIT_LIST_HEAD(&obd_zombie_exports);
1796         spin_lock_init(&obd_zombie_impexp_lock);
1797         init_completion(&obd_zombie_start);
1798         init_completion(&obd_zombie_stop);
1799         init_waitqueue_head(&obd_zombie_waitq);
1800         obd_zombie_pid = 0;
1801
1802         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1803         if (IS_ERR(task))
1804                 RETURN(PTR_ERR(task));
1805
1806         wait_for_completion(&obd_zombie_start);
1807         RETURN(0);
1808 }
1809 /**
1810  * stop destroy zombie import/export thread
1811  */
1812 void obd_zombie_impexp_stop(void)
1813 {
1814         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1815         obd_zombie_impexp_notify();
1816         wait_for_completion(&obd_zombie_stop);
1817 }
1818
1819 /***** Kernel-userspace comm helpers *******/
1820
1821 /* Get length of entire message, including header */
1822 int kuc_len(int payload_len)
1823 {
1824         return sizeof(struct kuc_hdr) + payload_len;
1825 }
1826 EXPORT_SYMBOL(kuc_len);
1827
1828 /* Get a pointer to kuc header, given a ptr to the payload
1829  * @param p Pointer to payload area
1830  * @returns Pointer to kuc header
1831  */
1832 struct kuc_hdr * kuc_ptr(void *p)
1833 {
1834         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1835         LASSERT(lh->kuc_magic == KUC_MAGIC);
1836         return lh;
1837 }
1838 EXPORT_SYMBOL(kuc_ptr);
1839
1840 /* Test if payload is part of kuc message
1841  * @param p Pointer to payload area
1842  * @returns boolean
1843  */
1844 int kuc_ispayload(void *p)
1845 {
1846         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1847
1848         if (kh->kuc_magic == KUC_MAGIC)
1849                 return 1;
1850         else
1851                 return 0;
1852 }
1853 EXPORT_SYMBOL(kuc_ispayload);
1854
1855 /* Alloc space for a message, and fill in header
1856  * @return Pointer to payload area
1857  */
1858 void *kuc_alloc(int payload_len, int transport, int type)
1859 {
1860         struct kuc_hdr *lh;
1861         int len = kuc_len(payload_len);
1862
1863         OBD_ALLOC(lh, len);
1864         if (lh == NULL)
1865                 return ERR_PTR(-ENOMEM);
1866
1867         lh->kuc_magic = KUC_MAGIC;
1868         lh->kuc_transport = transport;
1869         lh->kuc_msgtype = type;
1870         lh->kuc_msglen = len;
1871
1872         return (void *)(lh + 1);
1873 }
1874 EXPORT_SYMBOL(kuc_alloc);
1875
1876 /* Takes pointer to payload area */
1877 inline void kuc_free(void *p, int payload_len)
1878 {
1879         struct kuc_hdr *lh = kuc_ptr(p);
1880         OBD_FREE(lh, kuc_len(payload_len));
1881 }
1882 EXPORT_SYMBOL(kuc_free);
1883
1884 struct obd_request_slot_waiter {
1885         struct list_head        orsw_entry;
1886         wait_queue_head_t       orsw_waitq;
1887         bool                    orsw_signaled;
1888 };
1889
1890 static bool obd_request_slot_avail(struct client_obd *cli,
1891                                    struct obd_request_slot_waiter *orsw)
1892 {
1893         bool avail;
1894
1895         client_obd_list_lock(&cli->cl_loi_list_lock);
1896         avail = !!list_empty(&orsw->orsw_entry);
1897         client_obd_list_unlock(&cli->cl_loi_list_lock);
1898
1899         return avail;
1900 };
1901
1902 /*
1903  * For network flow control, the RPC sponsor needs to acquire a credit
1904  * before sending the RPC. The credits count for a connection is defined
1905  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1906  * the subsequent RPC sponsors need to wait until others released their
1907  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1908  */
1909 int obd_get_request_slot(struct client_obd *cli)
1910 {
1911         struct obd_request_slot_waiter   orsw;
1912         struct l_wait_info               lwi;
1913         int                              rc;
1914
1915         client_obd_list_lock(&cli->cl_loi_list_lock);
1916         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1917                 cli->cl_r_in_flight++;
1918                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1919                 return 0;
1920         }
1921
1922         init_waitqueue_head(&orsw.orsw_waitq);
1923         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1924         orsw.orsw_signaled = false;
1925         client_obd_list_unlock(&cli->cl_loi_list_lock);
1926
1927         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1928         rc = l_wait_event(orsw.orsw_waitq,
1929                           obd_request_slot_avail(cli, &orsw) ||
1930                           orsw.orsw_signaled,
1931                           &lwi);
1932
1933         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1934          * freed but other (such as obd_put_request_slot) is using it. */
1935         client_obd_list_lock(&cli->cl_loi_list_lock);
1936         if (rc != 0) {
1937                 if (!orsw.orsw_signaled) {
1938                         if (list_empty(&orsw.orsw_entry))
1939                                 cli->cl_r_in_flight--;
1940                         else
1941                                 list_del(&orsw.orsw_entry);
1942                 }
1943         }
1944
1945         if (orsw.orsw_signaled) {
1946                 LASSERT(list_empty(&orsw.orsw_entry));
1947
1948                 rc = -EINTR;
1949         }
1950         client_obd_list_unlock(&cli->cl_loi_list_lock);
1951
1952         return rc;
1953 }
1954 EXPORT_SYMBOL(obd_get_request_slot);
1955
1956 void obd_put_request_slot(struct client_obd *cli)
1957 {
1958         struct obd_request_slot_waiter *orsw;
1959
1960         client_obd_list_lock(&cli->cl_loi_list_lock);
1961         cli->cl_r_in_flight--;
1962
1963         /* If there is free slot, wakeup the first waiter. */
1964         if (!list_empty(&cli->cl_loi_read_list) &&
1965             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1966                 orsw = list_entry(cli->cl_loi_read_list.next,
1967                                   struct obd_request_slot_waiter, orsw_entry);
1968                 list_del_init(&orsw->orsw_entry);
1969                 cli->cl_r_in_flight++;
1970                 wake_up(&orsw->orsw_waitq);
1971         }
1972         client_obd_list_unlock(&cli->cl_loi_list_lock);
1973 }
1974 EXPORT_SYMBOL(obd_put_request_slot);
1975
1976 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1977 {
1978         return cli->cl_max_rpcs_in_flight;
1979 }
1980 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1981
1982 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1983 {
1984         struct obd_request_slot_waiter *orsw;
1985         __u32                           old;
1986         int                             diff;
1987         int                             i;
1988
1989         if (max > OBD_MAX_RIF_MAX || max < 1)
1990                 return -ERANGE;
1991
1992         client_obd_list_lock(&cli->cl_loi_list_lock);
1993         old = cli->cl_max_rpcs_in_flight;
1994         cli->cl_max_rpcs_in_flight = max;
1995         diff = max - old;
1996
1997         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1998         for (i = 0; i < diff; i++) {
1999                 if (list_empty(&cli->cl_loi_read_list))
2000                         break;
2001
2002                 orsw = list_entry(cli->cl_loi_read_list.next,
2003                                   struct obd_request_slot_waiter, orsw_entry);
2004                 list_del_init(&orsw->orsw_entry);
2005                 cli->cl_r_in_flight++;
2006                 wake_up(&orsw->orsw_waitq);
2007         }
2008         client_obd_list_unlock(&cli->cl_loi_list_lock);
2009
2010         return 0;
2011 }
2012 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);