Whamcloud - gitweb
LU-5859 llog: do not cleanup orphans in remote catalogs
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
45
46 spinlock_t obd_types_lock;
47
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t  obd_zombie_impexp_lock;
56
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks);
62
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80
81 static void obd_device_free(struct obd_device *obd)
82 {
83         LASSERT(obd != NULL);
84         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86         if (obd->obd_namespace != NULL) {
87                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88                        obd, obd->obd_namespace, obd->obd_force);
89                 LBUG();
90         }
91         lu_ref_fini(&obd->obd_reference);
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94
95 struct obd_type *class_search_type(const char *name)
96 {
97         struct list_head *tmp;
98         struct obd_type *type;
99
100         spin_lock(&obd_types_lock);
101         list_for_each(tmp, &obd_types) {
102                 type = list_entry(tmp, struct obd_type, typ_chain);
103                 if (strcmp(type->typ_name, name) == 0) {
104                         spin_unlock(&obd_types_lock);
105                         return type;
106                 }
107         }
108         spin_unlock(&obd_types_lock);
109         return NULL;
110 }
111 EXPORT_SYMBOL(class_search_type);
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef HAVE_MODULE_LOADING_SUPPORT
118         if (!type) {
119                 const char *modname = name;
120
121                 if (strcmp(modname, "obdfilter") == 0)
122                         modname = "ofd";
123
124                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125                         modname = LUSTRE_OSP_NAME;
126
127                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128                         modname = LUSTRE_MDT_NAME;
129
130                 if (!request_module("%s", modname)) {
131                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132                         type = class_search_type(name);
133                 } else {
134                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135                                            modname);
136                 }
137         }
138 #endif
139         if (type) {
140                 spin_lock(&type->obd_type_lock);
141                 type->typ_refcnt++;
142                 try_module_get(type->typ_dt_ops->o_owner);
143                 spin_unlock(&type->obd_type_lock);
144         }
145         return type;
146 }
147 EXPORT_SYMBOL(class_get_type);
148
149 void class_put_type(struct obd_type *type)
150 {
151         LASSERT(type);
152         spin_lock(&type->obd_type_lock);
153         type->typ_refcnt--;
154         module_put(type->typ_dt_ops->o_owner);
155         spin_unlock(&type->obd_type_lock);
156 }
157 EXPORT_SYMBOL(class_put_type);
158
159 #define CLASS_MAX_NAME 1024
160
161 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
162                         bool enable_proc, struct lprocfs_vars *vars,
163                         const char *name, struct lu_device_type *ldt)
164 {
165         struct obd_type *type;
166         int rc = 0;
167         ENTRY;
168
169         /* sanity check */
170         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
171
172         if (class_search_type(name)) {
173                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
174                 RETURN(-EEXIST);
175         }
176
177         rc = -ENOMEM;
178         OBD_ALLOC(type, sizeof(*type));
179         if (type == NULL)
180                 RETURN(rc);
181
182         OBD_ALLOC_PTR(type->typ_dt_ops);
183         OBD_ALLOC_PTR(type->typ_md_ops);
184         OBD_ALLOC(type->typ_name, strlen(name) + 1);
185
186         if (type->typ_dt_ops == NULL ||
187             type->typ_md_ops == NULL ||
188             type->typ_name == NULL)
189                 GOTO (failed, rc);
190
191         *(type->typ_dt_ops) = *dt_ops;
192         /* md_ops is optional */
193         if (md_ops)
194                 *(type->typ_md_ops) = *md_ops;
195         strcpy(type->typ_name, name);
196         spin_lock_init(&type->obd_type_lock);
197
198 #ifdef CONFIG_PROC_FS
199         if (enable_proc) {
200                 type->typ_procroot = lprocfs_register(type->typ_name,
201                                                       proc_lustre_root,
202                                                       vars, type);
203                 if (IS_ERR(type->typ_procroot)) {
204                         rc = PTR_ERR(type->typ_procroot);
205                         type->typ_procroot = NULL;
206                         GOTO(failed, rc);
207                 }
208         }
209 #endif
210         if (ldt != NULL) {
211                 type->typ_lu = ldt;
212                 rc = lu_device_type_init(ldt);
213                 if (rc != 0)
214                         GOTO (failed, rc);
215         }
216
217         spin_lock(&obd_types_lock);
218         list_add(&type->typ_chain, &obd_types);
219         spin_unlock(&obd_types_lock);
220
221         RETURN (0);
222
223 failed:
224         if (type->typ_name != NULL) {
225 #ifdef CONFIG_PROC_FS
226                 if (type->typ_procroot != NULL)
227                         remove_proc_subtree(type->typ_name, proc_lustre_root);
228 #endif
229                 OBD_FREE(type->typ_name, strlen(name) + 1);
230         }
231         if (type->typ_md_ops != NULL)
232                 OBD_FREE_PTR(type->typ_md_ops);
233         if (type->typ_dt_ops != NULL)
234                 OBD_FREE_PTR(type->typ_dt_ops);
235         OBD_FREE(type, sizeof(*type));
236         RETURN(rc);
237 }
238 EXPORT_SYMBOL(class_register_type);
239
240 int class_unregister_type(const char *name)
241 {
242         struct obd_type *type = class_search_type(name);
243         ENTRY;
244
245         if (!type) {
246                 CERROR("unknown obd type\n");
247                 RETURN(-EINVAL);
248         }
249
250         if (type->typ_refcnt) {
251                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
252                 /* This is a bad situation, let's make the best of it */
253                 /* Remove ops, but leave the name for debugging */
254                 OBD_FREE_PTR(type->typ_dt_ops);
255                 OBD_FREE_PTR(type->typ_md_ops);
256                 RETURN(-EBUSY);
257         }
258
259         /* we do not use type->typ_procroot as for compatibility purposes
260          * other modules can share names (i.e. lod can use lov entry). so
261          * we can't reference pointer as it can get invalided when another
262          * module removes the entry */
263 #ifdef CONFIG_PROC_FS
264         if (type->typ_procroot != NULL)
265                 remove_proc_subtree(type->typ_name, proc_lustre_root);
266         if (type->typ_procsym != NULL)
267                 lprocfs_remove(&type->typ_procsym);
268 #endif
269         if (type->typ_lu)
270                 lu_device_type_fini(type->typ_lu);
271
272         spin_lock(&obd_types_lock);
273         list_del(&type->typ_chain);
274         spin_unlock(&obd_types_lock);
275         OBD_FREE(type->typ_name, strlen(name) + 1);
276         if (type->typ_dt_ops != NULL)
277                 OBD_FREE_PTR(type->typ_dt_ops);
278         if (type->typ_md_ops != NULL)
279                 OBD_FREE_PTR(type->typ_md_ops);
280         OBD_FREE(type, sizeof(*type));
281         RETURN(0);
282 } /* class_unregister_type */
283 EXPORT_SYMBOL(class_unregister_type);
284
285 /**
286  * Create a new obd device.
287  *
288  * Find an empty slot in ::obd_devs[], create a new obd device in it.
289  *
290  * \param[in] type_name obd device type string.
291  * \param[in] name      obd device name.
292  *
293  * \retval NULL if create fails, otherwise return the obd device
294  *         pointer created.
295  */
296 struct obd_device *class_newdev(const char *type_name, const char *name)
297 {
298         struct obd_device *result = NULL;
299         struct obd_device *newdev;
300         struct obd_type *type = NULL;
301         int i;
302         int new_obd_minor = 0;
303         ENTRY;
304
305         if (strlen(name) >= MAX_OBD_NAME) {
306                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
307                 RETURN(ERR_PTR(-EINVAL));
308         }
309
310         type = class_get_type(type_name);
311         if (type == NULL){
312                 CERROR("OBD: unknown type: %s\n", type_name);
313                 RETURN(ERR_PTR(-ENODEV));
314         }
315
316         newdev = obd_device_alloc();
317         if (newdev == NULL)
318                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
319
320         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
321
322         write_lock(&obd_dev_lock);
323         for (i = 0; i < class_devno_max(); i++) {
324                 struct obd_device *obd = class_num2obd(i);
325
326                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
327                         CERROR("Device %s already exists at %d, won't add\n",
328                                name, i);
329                         if (result) {
330                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
331                                          "%p obd_magic %08x != %08x\n", result,
332                                          result->obd_magic, OBD_DEVICE_MAGIC);
333                                 LASSERTF(result->obd_minor == new_obd_minor,
334                                          "%p obd_minor %d != %d\n", result,
335                                          result->obd_minor, new_obd_minor);
336
337                                 obd_devs[result->obd_minor] = NULL;
338                                 result->obd_name[0]='\0';
339                          }
340                         result = ERR_PTR(-EEXIST);
341                         break;
342                 }
343                 if (!result && !obd) {
344                         result = newdev;
345                         result->obd_minor = i;
346                         new_obd_minor = i;
347                         result->obd_type = type;
348                         strncpy(result->obd_name, name,
349                                 sizeof(result->obd_name) - 1);
350                         obd_devs[i] = result;
351                 }
352         }
353         write_unlock(&obd_dev_lock);
354
355         if (result == NULL && i >= class_devno_max()) {
356                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
357                        class_devno_max());
358                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
359         }
360
361         if (IS_ERR(result))
362                 GOTO(out, result);
363
364         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
365                result->obd_name, result);
366
367         RETURN(result);
368 out:
369         obd_device_free(newdev);
370 out_type:
371         class_put_type(type);
372         return result;
373 }
374
375 void class_release_dev(struct obd_device *obd)
376 {
377         struct obd_type *obd_type = obd->obd_type;
378
379         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
380                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
381         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
382                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
383         LASSERT(obd_type != NULL);
384
385         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
386                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
387
388         write_lock(&obd_dev_lock);
389         obd_devs[obd->obd_minor] = NULL;
390         write_unlock(&obd_dev_lock);
391         obd_device_free(obd);
392
393         class_put_type(obd_type);
394 }
395
396 int class_name2dev(const char *name)
397 {
398         int i;
399
400         if (!name)
401                 return -1;
402
403         read_lock(&obd_dev_lock);
404         for (i = 0; i < class_devno_max(); i++) {
405                 struct obd_device *obd = class_num2obd(i);
406
407                 if (obd && strcmp(name, obd->obd_name) == 0) {
408                         /* Make sure we finished attaching before we give
409                            out any references */
410                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
411                         if (obd->obd_attached) {
412                                 read_unlock(&obd_dev_lock);
413                                 return i;
414                         }
415                         break;
416                 }
417         }
418         read_unlock(&obd_dev_lock);
419
420         return -1;
421 }
422 EXPORT_SYMBOL(class_name2dev);
423
424 struct obd_device *class_name2obd(const char *name)
425 {
426         int dev = class_name2dev(name);
427
428         if (dev < 0 || dev > class_devno_max())
429                 return NULL;
430         return class_num2obd(dev);
431 }
432 EXPORT_SYMBOL(class_name2obd);
433
434 int class_uuid2dev(struct obd_uuid *uuid)
435 {
436         int i;
437
438         read_lock(&obd_dev_lock);
439         for (i = 0; i < class_devno_max(); i++) {
440                 struct obd_device *obd = class_num2obd(i);
441
442                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444                         read_unlock(&obd_dev_lock);
445                         return i;
446                 }
447         }
448         read_unlock(&obd_dev_lock);
449
450         return -1;
451 }
452 EXPORT_SYMBOL(class_uuid2dev);
453
454 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
455 {
456         int dev = class_uuid2dev(uuid);
457         if (dev < 0)
458                 return NULL;
459         return class_num2obd(dev);
460 }
461 EXPORT_SYMBOL(class_uuid2obd);
462
463 /**
464  * Get obd device from ::obd_devs[]
465  *
466  * \param num [in] array index
467  *
468  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
469  *         otherwise return the obd device there.
470  */
471 struct obd_device *class_num2obd(int num)
472 {
473         struct obd_device *obd = NULL;
474
475         if (num < class_devno_max()) {
476                 obd = obd_devs[num];
477                 if (obd == NULL)
478                         return NULL;
479
480                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
481                          "%p obd_magic %08x != %08x\n",
482                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
483                 LASSERTF(obd->obd_minor == num,
484                          "%p obd_minor %0d != %0d\n",
485                          obd, obd->obd_minor, num);
486         }
487
488         return obd;
489 }
490 EXPORT_SYMBOL(class_num2obd);
491
492 /**
493  * Get obd devices count. Device in any
494  *    state are counted
495  * \retval obd device count
496  */
497 int get_devices_count(void)
498 {
499         int index, max_index = class_devno_max(), dev_count = 0;
500
501         read_lock(&obd_dev_lock);
502         for (index = 0; index <= max_index; index++) {
503                 struct obd_device *obd = class_num2obd(index);
504                 if (obd != NULL)
505                         dev_count++;
506         }
507         read_unlock(&obd_dev_lock);
508
509         return dev_count;
510 }
511 EXPORT_SYMBOL(get_devices_count);
512
513 void class_obd_list(void)
514 {
515         char *status;
516         int i;
517
518         read_lock(&obd_dev_lock);
519         for (i = 0; i < class_devno_max(); i++) {
520                 struct obd_device *obd = class_num2obd(i);
521
522                 if (obd == NULL)
523                         continue;
524                 if (obd->obd_stopping)
525                         status = "ST";
526                 else if (obd->obd_set_up)
527                         status = "UP";
528                 else if (obd->obd_attached)
529                         status = "AT";
530                 else
531                         status = "--";
532                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
533                          i, status, obd->obd_type->typ_name,
534                          obd->obd_name, obd->obd_uuid.uuid,
535                          atomic_read(&obd->obd_refcount));
536         }
537         read_unlock(&obd_dev_lock);
538         return;
539 }
540
541 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
542    specified, then only the client with that uuid is returned,
543    otherwise any client connected to the tgt is returned. */
544 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
545                                           const char * typ_name,
546                                           struct obd_uuid *grp_uuid)
547 {
548         int i;
549
550         read_lock(&obd_dev_lock);
551         for (i = 0; i < class_devno_max(); i++) {
552                 struct obd_device *obd = class_num2obd(i);
553
554                 if (obd == NULL)
555                         continue;
556                 if ((strncmp(obd->obd_type->typ_name, typ_name,
557                              strlen(typ_name)) == 0)) {
558                         if (obd_uuid_equals(tgt_uuid,
559                                             &obd->u.cli.cl_target_uuid) &&
560                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
561                                                          &obd->obd_uuid) : 1)) {
562                                 read_unlock(&obd_dev_lock);
563                                 return obd;
564                         }
565                 }
566         }
567         read_unlock(&obd_dev_lock);
568
569         return NULL;
570 }
571 EXPORT_SYMBOL(class_find_client_obd);
572
573 /* Iterate the obd_device list looking devices have grp_uuid. Start
574    searching at *next, and if a device is found, the next index to look
575    at is saved in *next. If next is NULL, then the first matching device
576    will always be returned. */
577 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
578 {
579         int i;
580
581         if (next == NULL)
582                 i = 0;
583         else if (*next >= 0 && *next < class_devno_max())
584                 i = *next;
585         else
586                 return NULL;
587
588         read_lock(&obd_dev_lock);
589         for (; i < class_devno_max(); i++) {
590                 struct obd_device *obd = class_num2obd(i);
591
592                 if (obd == NULL)
593                         continue;
594                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
595                         if (next != NULL)
596                                 *next = i+1;
597                         read_unlock(&obd_dev_lock);
598                         return obd;
599                 }
600         }
601         read_unlock(&obd_dev_lock);
602
603         return NULL;
604 }
605 EXPORT_SYMBOL(class_devices_in_group);
606
607 /**
608  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
609  * adjust sptlrpc settings accordingly.
610  */
611 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
612 {
613         struct obd_device  *obd;
614         const char         *type;
615         int                 i, rc = 0, rc2;
616
617         LASSERT(namelen > 0);
618
619         read_lock(&obd_dev_lock);
620         for (i = 0; i < class_devno_max(); i++) {
621                 obd = class_num2obd(i);
622
623                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
624                         continue;
625
626                 /* only notify mdc, osc, mdt, ost */
627                 type = obd->obd_type->typ_name;
628                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
629                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
630                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
631                     strcmp(type, LUSTRE_OST_NAME) != 0)
632                         continue;
633
634                 if (strncmp(obd->obd_name, fsname, namelen))
635                         continue;
636
637                 class_incref(obd, __FUNCTION__, obd);
638                 read_unlock(&obd_dev_lock);
639                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
640                                          sizeof(KEY_SPTLRPC_CONF),
641                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
642                 rc = rc ? rc : rc2;
643                 class_decref(obd, __FUNCTION__, obd);
644                 read_lock(&obd_dev_lock);
645         }
646         read_unlock(&obd_dev_lock);
647         return rc;
648 }
649 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
650
651 void obd_cleanup_caches(void)
652 {
653         ENTRY;
654         if (obd_device_cachep) {
655                 kmem_cache_destroy(obd_device_cachep);
656                 obd_device_cachep = NULL;
657         }
658         if (obdo_cachep) {
659                 kmem_cache_destroy(obdo_cachep);
660                 obdo_cachep = NULL;
661         }
662         if (import_cachep) {
663                 kmem_cache_destroy(import_cachep);
664                 import_cachep = NULL;
665         }
666         if (capa_cachep) {
667                 kmem_cache_destroy(capa_cachep);
668                 capa_cachep = NULL;
669         }
670         EXIT;
671 }
672
673 int obd_init_caches(void)
674 {
675         int rc;
676         ENTRY;
677
678         LASSERT(obd_device_cachep == NULL);
679         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
680                                               sizeof(struct obd_device),
681                                               0, 0, NULL);
682         if (!obd_device_cachep)
683                 GOTO(out, rc = -ENOMEM);
684
685         LASSERT(obdo_cachep == NULL);
686         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
687                                         0, 0, NULL);
688         if (!obdo_cachep)
689                 GOTO(out, rc = -ENOMEM);
690
691         LASSERT(import_cachep == NULL);
692         import_cachep = kmem_cache_create("ll_import_cache",
693                                           sizeof(struct obd_import),
694                                           0, 0, NULL);
695         if (!import_cachep)
696                 GOTO(out, rc = -ENOMEM);
697
698         LASSERT(capa_cachep == NULL);
699         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
700                                         0, 0, NULL);
701         if (!capa_cachep)
702                 GOTO(out, rc = -ENOMEM);
703
704         RETURN(0);
705 out:
706         obd_cleanup_caches();
707         RETURN(rc);
708 }
709
710 /* map connection to client */
711 struct obd_export *class_conn2export(struct lustre_handle *conn)
712 {
713         struct obd_export *export;
714         ENTRY;
715
716         if (!conn) {
717                 CDEBUG(D_CACHE, "looking for null handle\n");
718                 RETURN(NULL);
719         }
720
721         if (conn->cookie == -1) {  /* this means assign a new connection */
722                 CDEBUG(D_CACHE, "want a new connection\n");
723                 RETURN(NULL);
724         }
725
726         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
727         export = class_handle2object(conn->cookie, NULL);
728         RETURN(export);
729 }
730 EXPORT_SYMBOL(class_conn2export);
731
732 struct obd_device *class_exp2obd(struct obd_export *exp)
733 {
734         if (exp)
735                 return exp->exp_obd;
736         return NULL;
737 }
738 EXPORT_SYMBOL(class_exp2obd);
739
740 struct obd_device *class_conn2obd(struct lustre_handle *conn)
741 {
742         struct obd_export *export;
743         export = class_conn2export(conn);
744         if (export) {
745                 struct obd_device *obd = export->exp_obd;
746                 class_export_put(export);
747                 return obd;
748         }
749         return NULL;
750 }
751 EXPORT_SYMBOL(class_conn2obd);
752
753 struct obd_import *class_exp2cliimp(struct obd_export *exp)
754 {
755         struct obd_device *obd = exp->exp_obd;
756         if (obd == NULL)
757                 return NULL;
758         return obd->u.cli.cl_import;
759 }
760 EXPORT_SYMBOL(class_exp2cliimp);
761
762 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
763 {
764         struct obd_device *obd = class_conn2obd(conn);
765         if (obd == NULL)
766                 return NULL;
767         return obd->u.cli.cl_import;
768 }
769 EXPORT_SYMBOL(class_conn2cliimp);
770
771 /* Export management functions */
772 static void class_export_destroy(struct obd_export *exp)
773 {
774         struct obd_device *obd = exp->exp_obd;
775         ENTRY;
776
777         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
778         LASSERT(obd != NULL);
779
780         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
781                exp->exp_client_uuid.uuid, obd->obd_name);
782
783         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
784         if (exp->exp_connection)
785                 ptlrpc_put_connection_superhack(exp->exp_connection);
786
787         LASSERT(list_empty(&exp->exp_outstanding_replies));
788         LASSERT(list_empty(&exp->exp_uncommitted_replies));
789         LASSERT(list_empty(&exp->exp_req_replay_queue));
790         LASSERT(list_empty(&exp->exp_hp_rpcs));
791         obd_destroy_export(exp);
792         class_decref(obd, "export", exp);
793
794         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
795         EXIT;
796 }
797
798 static void export_handle_addref(void *export)
799 {
800         class_export_get(export);
801 }
802
803 static struct portals_handle_ops export_handle_ops = {
804         .hop_addref = export_handle_addref,
805         .hop_free   = NULL,
806 };
807
808 struct obd_export *class_export_get(struct obd_export *exp)
809 {
810         atomic_inc(&exp->exp_refcount);
811         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
812                atomic_read(&exp->exp_refcount));
813         return exp;
814 }
815 EXPORT_SYMBOL(class_export_get);
816
817 void class_export_put(struct obd_export *exp)
818 {
819         LASSERT(exp != NULL);
820         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
821         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
822                atomic_read(&exp->exp_refcount) - 1);
823
824         if (atomic_dec_and_test(&exp->exp_refcount)) {
825                 LASSERT(!list_empty(&exp->exp_obd_chain));
826                 CDEBUG(D_IOCTL, "final put %p/%s\n",
827                        exp, exp->exp_client_uuid.uuid);
828
829                 /* release nid stat refererence */
830                 lprocfs_exp_cleanup(exp);
831
832                 obd_zombie_export_add(exp);
833         }
834 }
835 EXPORT_SYMBOL(class_export_put);
836
837 /* Creates a new export, adds it to the hash table, and returns a
838  * pointer to it. The refcount is 2: one for the hash reference, and
839  * one for the pointer returned by this function. */
840 struct obd_export *class_new_export(struct obd_device *obd,
841                                     struct obd_uuid *cluuid)
842 {
843         struct obd_export *export;
844         cfs_hash_t *hash = NULL;
845         int rc = 0;
846         ENTRY;
847
848         OBD_ALLOC_PTR(export);
849         if (!export)
850                 return ERR_PTR(-ENOMEM);
851
852         export->exp_conn_cnt = 0;
853         export->exp_lock_hash = NULL;
854         export->exp_flock_hash = NULL;
855         atomic_set(&export->exp_refcount, 2);
856         atomic_set(&export->exp_rpc_count, 0);
857         atomic_set(&export->exp_cb_count, 0);
858         atomic_set(&export->exp_locks_count, 0);
859 #if LUSTRE_TRACKS_LOCK_EXP_REFS
860         INIT_LIST_HEAD(&export->exp_locks_list);
861         spin_lock_init(&export->exp_locks_list_guard);
862 #endif
863         atomic_set(&export->exp_replay_count, 0);
864         export->exp_obd = obd;
865         INIT_LIST_HEAD(&export->exp_outstanding_replies);
866         spin_lock_init(&export->exp_uncommitted_replies_lock);
867         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
868         INIT_LIST_HEAD(&export->exp_req_replay_queue);
869         INIT_LIST_HEAD(&export->exp_handle.h_link);
870         INIT_LIST_HEAD(&export->exp_hp_rpcs);
871         INIT_LIST_HEAD(&export->exp_reg_rpcs);
872         class_handle_hash(&export->exp_handle, &export_handle_ops);
873         export->exp_last_request_time = cfs_time_current_sec();
874         spin_lock_init(&export->exp_lock);
875         spin_lock_init(&export->exp_rpc_lock);
876         INIT_HLIST_NODE(&export->exp_uuid_hash);
877         INIT_HLIST_NODE(&export->exp_nid_hash);
878         spin_lock_init(&export->exp_bl_list_lock);
879         INIT_LIST_HEAD(&export->exp_bl_list);
880
881         export->exp_sp_peer = LUSTRE_SP_ANY;
882         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
883         export->exp_client_uuid = *cluuid;
884         obd_init_export(export);
885
886         spin_lock(&obd->obd_dev_lock);
887         /* shouldn't happen, but might race */
888         if (obd->obd_stopping)
889                 GOTO(exit_unlock, rc = -ENODEV);
890
891         hash = cfs_hash_getref(obd->obd_uuid_hash);
892         if (hash == NULL)
893                 GOTO(exit_unlock, rc = -ENODEV);
894         spin_unlock(&obd->obd_dev_lock);
895
896         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
897                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
898                 if (rc != 0) {
899                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
900                                       obd->obd_name, cluuid->uuid, rc);
901                         GOTO(exit_err, rc = -EALREADY);
902                 }
903         }
904
905         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
906         spin_lock(&obd->obd_dev_lock);
907         if (obd->obd_stopping) {
908                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
909                 GOTO(exit_unlock, rc = -ENODEV);
910         }
911
912         class_incref(obd, "export", export);
913         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
914         list_add_tail(&export->exp_obd_chain_timed,
915                       &export->exp_obd->obd_exports_timed);
916         export->exp_obd->obd_num_exports++;
917         spin_unlock(&obd->obd_dev_lock);
918         cfs_hash_putref(hash);
919         RETURN(export);
920
921 exit_unlock:
922         spin_unlock(&obd->obd_dev_lock);
923 exit_err:
924         if (hash)
925                 cfs_hash_putref(hash);
926         class_handle_unhash(&export->exp_handle);
927         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
928         obd_destroy_export(export);
929         OBD_FREE_PTR(export);
930         return ERR_PTR(rc);
931 }
932 EXPORT_SYMBOL(class_new_export);
933
934 void class_unlink_export(struct obd_export *exp)
935 {
936         class_handle_unhash(&exp->exp_handle);
937
938         spin_lock(&exp->exp_obd->obd_dev_lock);
939         /* delete an uuid-export hashitem from hashtables */
940         if (!hlist_unhashed(&exp->exp_uuid_hash))
941                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
942                              &exp->exp_client_uuid,
943                              &exp->exp_uuid_hash);
944
945         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
946         list_del_init(&exp->exp_obd_chain_timed);
947         exp->exp_obd->obd_num_exports--;
948         spin_unlock(&exp->exp_obd->obd_dev_lock);
949         class_export_put(exp);
950 }
951 EXPORT_SYMBOL(class_unlink_export);
952
953 /* Import management functions */
954 static void class_import_destroy(struct obd_import *imp)
955 {
956         ENTRY;
957
958         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
959                 imp->imp_obd->obd_name);
960
961         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
962
963         ptlrpc_put_connection_superhack(imp->imp_connection);
964
965         while (!list_empty(&imp->imp_conn_list)) {
966                 struct obd_import_conn *imp_conn;
967
968                 imp_conn = list_entry(imp->imp_conn_list.next,
969                                       struct obd_import_conn, oic_item);
970                 list_del_init(&imp_conn->oic_item);
971                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
972                 OBD_FREE(imp_conn, sizeof(*imp_conn));
973         }
974
975         LASSERT(imp->imp_sec == NULL);
976         class_decref(imp->imp_obd, "import", imp);
977         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
978         EXIT;
979 }
980
981 static void import_handle_addref(void *import)
982 {
983         class_import_get(import);
984 }
985
986 static struct portals_handle_ops import_handle_ops = {
987         .hop_addref = import_handle_addref,
988         .hop_free   = NULL,
989 };
990
991 struct obd_import *class_import_get(struct obd_import *import)
992 {
993         atomic_inc(&import->imp_refcount);
994         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
995                atomic_read(&import->imp_refcount),
996                import->imp_obd->obd_name);
997         return import;
998 }
999 EXPORT_SYMBOL(class_import_get);
1000
1001 void class_import_put(struct obd_import *imp)
1002 {
1003         ENTRY;
1004
1005         LASSERT(list_empty(&imp->imp_zombie_chain));
1006         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1007
1008         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1009                atomic_read(&imp->imp_refcount) - 1,
1010                imp->imp_obd->obd_name);
1011
1012         if (atomic_dec_and_test(&imp->imp_refcount)) {
1013                 CDEBUG(D_INFO, "final put import %p\n", imp);
1014                 obd_zombie_import_add(imp);
1015         }
1016
1017         /* catch possible import put race */
1018         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1019         EXIT;
1020 }
1021 EXPORT_SYMBOL(class_import_put);
1022
1023 static void init_imp_at(struct imp_at *at) {
1024         int i;
1025         at_init(&at->iat_net_latency, 0, 0);
1026         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1027                 /* max service estimates are tracked on the server side, so
1028                    don't use the AT history here, just use the last reported
1029                    val. (But keep hist for proc histogram, worst_ever) */
1030                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1031                         AT_FLG_NOHIST);
1032         }
1033 }
1034
1035 struct obd_import *class_new_import(struct obd_device *obd)
1036 {
1037         struct obd_import *imp;
1038
1039         OBD_ALLOC(imp, sizeof(*imp));
1040         if (imp == NULL)
1041                 return NULL;
1042
1043         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1044         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1045         INIT_LIST_HEAD(&imp->imp_replay_list);
1046         INIT_LIST_HEAD(&imp->imp_sending_list);
1047         INIT_LIST_HEAD(&imp->imp_delayed_list);
1048         INIT_LIST_HEAD(&imp->imp_committed_list);
1049         imp->imp_replay_cursor = &imp->imp_committed_list;
1050         spin_lock_init(&imp->imp_lock);
1051         imp->imp_last_success_conn = 0;
1052         imp->imp_state = LUSTRE_IMP_NEW;
1053         imp->imp_obd = class_incref(obd, "import", imp);
1054         mutex_init(&imp->imp_sec_mutex);
1055         init_waitqueue_head(&imp->imp_recovery_waitq);
1056
1057         atomic_set(&imp->imp_refcount, 2);
1058         atomic_set(&imp->imp_unregistering, 0);
1059         atomic_set(&imp->imp_inflight, 0);
1060         atomic_set(&imp->imp_replay_inflight, 0);
1061         atomic_set(&imp->imp_inval_count, 0);
1062         INIT_LIST_HEAD(&imp->imp_conn_list);
1063         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1064         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1065         init_imp_at(&imp->imp_at);
1066
1067         /* the default magic is V2, will be used in connect RPC, and
1068          * then adjusted according to the flags in request/reply. */
1069         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1070
1071         return imp;
1072 }
1073 EXPORT_SYMBOL(class_new_import);
1074
1075 void class_destroy_import(struct obd_import *import)
1076 {
1077         LASSERT(import != NULL);
1078         LASSERT(import != LP_POISON);
1079
1080         class_handle_unhash(&import->imp_handle);
1081
1082         spin_lock(&import->imp_lock);
1083         import->imp_generation++;
1084         spin_unlock(&import->imp_lock);
1085         class_import_put(import);
1086 }
1087 EXPORT_SYMBOL(class_destroy_import);
1088
1089 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1090
1091 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1092 {
1093         spin_lock(&exp->exp_locks_list_guard);
1094
1095         LASSERT(lock->l_exp_refs_nr >= 0);
1096
1097         if (lock->l_exp_refs_target != NULL &&
1098             lock->l_exp_refs_target != exp) {
1099                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1100                               exp, lock, lock->l_exp_refs_target);
1101         }
1102         if ((lock->l_exp_refs_nr ++) == 0) {
1103                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1104                 lock->l_exp_refs_target = exp;
1105         }
1106         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1107                lock, exp, lock->l_exp_refs_nr);
1108         spin_unlock(&exp->exp_locks_list_guard);
1109 }
1110 EXPORT_SYMBOL(__class_export_add_lock_ref);
1111
1112 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1113 {
1114         spin_lock(&exp->exp_locks_list_guard);
1115         LASSERT(lock->l_exp_refs_nr > 0);
1116         if (lock->l_exp_refs_target != exp) {
1117                 LCONSOLE_WARN("lock %p, "
1118                               "mismatching export pointers: %p, %p\n",
1119                               lock, lock->l_exp_refs_target, exp);
1120         }
1121         if (-- lock->l_exp_refs_nr == 0) {
1122                 list_del_init(&lock->l_exp_refs_link);
1123                 lock->l_exp_refs_target = NULL;
1124         }
1125         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1126                lock, exp, lock->l_exp_refs_nr);
1127         spin_unlock(&exp->exp_locks_list_guard);
1128 }
1129 EXPORT_SYMBOL(__class_export_del_lock_ref);
1130 #endif
1131
1132 /* A connection defines an export context in which preallocation can
1133    be managed. This releases the export pointer reference, and returns
1134    the export handle, so the export refcount is 1 when this function
1135    returns. */
1136 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1137                   struct obd_uuid *cluuid)
1138 {
1139         struct obd_export *export;
1140         LASSERT(conn != NULL);
1141         LASSERT(obd != NULL);
1142         LASSERT(cluuid != NULL);
1143         ENTRY;
1144
1145         export = class_new_export(obd, cluuid);
1146         if (IS_ERR(export))
1147                 RETURN(PTR_ERR(export));
1148
1149         conn->cookie = export->exp_handle.h_cookie;
1150         class_export_put(export);
1151
1152         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1153                cluuid->uuid, conn->cookie);
1154         RETURN(0);
1155 }
1156 EXPORT_SYMBOL(class_connect);
1157
1158 /* if export is involved in recovery then clean up related things */
1159 static void class_export_recovery_cleanup(struct obd_export *exp)
1160 {
1161         struct obd_device *obd = exp->exp_obd;
1162
1163         spin_lock(&obd->obd_recovery_task_lock);
1164         if (obd->obd_recovering) {
1165                 if (exp->exp_in_recovery) {
1166                         spin_lock(&exp->exp_lock);
1167                         exp->exp_in_recovery = 0;
1168                         spin_unlock(&exp->exp_lock);
1169                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1170                         atomic_dec(&obd->obd_connected_clients);
1171                 }
1172
1173                 /* if called during recovery then should update
1174                  * obd_stale_clients counter,
1175                  * lightweight exports are not counted */
1176                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1177                         exp->exp_obd->obd_stale_clients++;
1178         }
1179         spin_unlock(&obd->obd_recovery_task_lock);
1180
1181         spin_lock(&exp->exp_lock);
1182         /** Cleanup req replay fields */
1183         if (exp->exp_req_replay_needed) {
1184                 exp->exp_req_replay_needed = 0;
1185
1186                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1187                 atomic_dec(&obd->obd_req_replay_clients);
1188         }
1189
1190         /** Cleanup lock replay data */
1191         if (exp->exp_lock_replay_needed) {
1192                 exp->exp_lock_replay_needed = 0;
1193
1194                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1195                 atomic_dec(&obd->obd_lock_replay_clients);
1196         }
1197         spin_unlock(&exp->exp_lock);
1198 }
1199
1200 /* This function removes 1-3 references from the export:
1201  * 1 - for export pointer passed
1202  * and if disconnect really need
1203  * 2 - removing from hash
1204  * 3 - in client_unlink_export
1205  * The export pointer passed to this function can destroyed */
1206 int class_disconnect(struct obd_export *export)
1207 {
1208         int already_disconnected;
1209         ENTRY;
1210
1211         if (export == NULL) {
1212                 CWARN("attempting to free NULL export %p\n", export);
1213                 RETURN(-EINVAL);
1214         }
1215
1216         spin_lock(&export->exp_lock);
1217         already_disconnected = export->exp_disconnected;
1218         export->exp_disconnected = 1;
1219         spin_unlock(&export->exp_lock);
1220
1221         /* class_cleanup(), abort_recovery(), and class_fail_export()
1222          * all end up in here, and if any of them race we shouldn't
1223          * call extra class_export_puts(). */
1224         if (already_disconnected) {
1225                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1226                 GOTO(no_disconn, already_disconnected);
1227         }
1228
1229         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1230                export->exp_handle.h_cookie);
1231
1232         if (!hlist_unhashed(&export->exp_nid_hash))
1233                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1234                              &export->exp_connection->c_peer.nid,
1235                              &export->exp_nid_hash);
1236
1237         class_export_recovery_cleanup(export);
1238         class_unlink_export(export);
1239 no_disconn:
1240         class_export_put(export);
1241         RETURN(0);
1242 }
1243 EXPORT_SYMBOL(class_disconnect);
1244
1245 /* Return non-zero for a fully connected export */
1246 int class_connected_export(struct obd_export *exp)
1247 {
1248         int connected = 0;
1249
1250         if (exp) {
1251                 spin_lock(&exp->exp_lock);
1252                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1253                 spin_unlock(&exp->exp_lock);
1254         }
1255         return connected;
1256 }
1257 EXPORT_SYMBOL(class_connected_export);
1258
1259 static void class_disconnect_export_list(struct list_head *list,
1260                                          enum obd_option flags)
1261 {
1262         int rc;
1263         struct obd_export *exp;
1264         ENTRY;
1265
1266         /* It's possible that an export may disconnect itself, but
1267          * nothing else will be added to this list. */
1268         while (!list_empty(list)) {
1269                 exp = list_entry(list->next, struct obd_export,
1270                                  exp_obd_chain);
1271                 /* need for safe call CDEBUG after obd_disconnect */
1272                 class_export_get(exp);
1273
1274                 spin_lock(&exp->exp_lock);
1275                 exp->exp_flags = flags;
1276                 spin_unlock(&exp->exp_lock);
1277
1278                 if (obd_uuid_equals(&exp->exp_client_uuid,
1279                                     &exp->exp_obd->obd_uuid)) {
1280                         CDEBUG(D_HA,
1281                                "exp %p export uuid == obd uuid, don't discon\n",
1282                                exp);
1283                         /* Need to delete this now so we don't end up pointing
1284                          * to work_list later when this export is cleaned up. */
1285                         list_del_init(&exp->exp_obd_chain);
1286                         class_export_put(exp);
1287                         continue;
1288                 }
1289
1290                 class_export_get(exp);
1291                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1292                        "last request at "CFS_TIME_T"\n",
1293                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1294                        exp, exp->exp_last_request_time);
1295                 /* release one export reference anyway */
1296                 rc = obd_disconnect(exp);
1297
1298                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1299                        obd_export_nid2str(exp), exp, rc);
1300                 class_export_put(exp);
1301         }
1302         EXIT;
1303 }
1304
1305 void class_disconnect_exports(struct obd_device *obd)
1306 {
1307         struct list_head work_list;
1308         ENTRY;
1309
1310         /* Move all of the exports from obd_exports to a work list, en masse. */
1311         INIT_LIST_HEAD(&work_list);
1312         spin_lock(&obd->obd_dev_lock);
1313         list_splice_init(&obd->obd_exports, &work_list);
1314         list_splice_init(&obd->obd_delayed_exports, &work_list);
1315         spin_unlock(&obd->obd_dev_lock);
1316
1317         if (!list_empty(&work_list)) {
1318                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1319                        "disconnecting them\n", obd->obd_minor, obd);
1320                 class_disconnect_export_list(&work_list,
1321                                              exp_flags_from_obd(obd));
1322         } else
1323                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1324                        obd->obd_minor, obd);
1325         EXIT;
1326 }
1327 EXPORT_SYMBOL(class_disconnect_exports);
1328
1329 /* Remove exports that have not completed recovery.
1330  */
1331 void class_disconnect_stale_exports(struct obd_device *obd,
1332                                     int (*test_export)(struct obd_export *))
1333 {
1334         struct list_head work_list;
1335         struct obd_export *exp, *n;
1336         int evicted = 0;
1337         ENTRY;
1338
1339         INIT_LIST_HEAD(&work_list);
1340         spin_lock(&obd->obd_dev_lock);
1341         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1342                                  exp_obd_chain) {
1343                 /* don't count self-export as client */
1344                 if (obd_uuid_equals(&exp->exp_client_uuid,
1345                                     &exp->exp_obd->obd_uuid))
1346                         continue;
1347
1348                 /* don't evict clients which have no slot in last_rcvd
1349                  * (e.g. lightweight connection) */
1350                 if (exp->exp_target_data.ted_lr_idx == -1)
1351                         continue;
1352
1353                 spin_lock(&exp->exp_lock);
1354                 if (exp->exp_failed || test_export(exp)) {
1355                         spin_unlock(&exp->exp_lock);
1356                         continue;
1357                 }
1358                 exp->exp_failed = 1;
1359                 spin_unlock(&exp->exp_lock);
1360
1361                 list_move(&exp->exp_obd_chain, &work_list);
1362                 evicted++;
1363                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1364                        obd->obd_name, exp->exp_client_uuid.uuid,
1365                        exp->exp_connection == NULL ? "<unknown>" :
1366                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1367                 print_export_data(exp, "EVICTING", 0);
1368         }
1369         spin_unlock(&obd->obd_dev_lock);
1370
1371         if (evicted)
1372                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1373                               obd->obd_name, evicted);
1374
1375         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1376                                                  OBD_OPT_ABORT_RECOV);
1377         EXIT;
1378 }
1379 EXPORT_SYMBOL(class_disconnect_stale_exports);
1380
1381 void class_fail_export(struct obd_export *exp)
1382 {
1383         int rc, already_failed;
1384
1385         spin_lock(&exp->exp_lock);
1386         already_failed = exp->exp_failed;
1387         exp->exp_failed = 1;
1388         spin_unlock(&exp->exp_lock);
1389
1390         if (already_failed) {
1391                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1392                        exp, exp->exp_client_uuid.uuid);
1393                 return;
1394         }
1395
1396         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1397                exp, exp->exp_client_uuid.uuid);
1398
1399         if (obd_dump_on_timeout)
1400                 libcfs_debug_dumplog();
1401
1402         /* need for safe call CDEBUG after obd_disconnect */
1403         class_export_get(exp);
1404
1405         /* Most callers into obd_disconnect are removing their own reference
1406          * (request, for example) in addition to the one from the hash table.
1407          * We don't have such a reference here, so make one. */
1408         class_export_get(exp);
1409         rc = obd_disconnect(exp);
1410         if (rc)
1411                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1412         else
1413                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1414                        exp, exp->exp_client_uuid.uuid);
1415         class_export_put(exp);
1416 }
1417 EXPORT_SYMBOL(class_fail_export);
1418
1419 char *obd_export_nid2str(struct obd_export *exp)
1420 {
1421         if (exp->exp_connection != NULL)
1422                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1423
1424         return "(no nid)";
1425 }
1426 EXPORT_SYMBOL(obd_export_nid2str);
1427
1428 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1429 {
1430         cfs_hash_t *nid_hash;
1431         struct obd_export *doomed_exp = NULL;
1432         int exports_evicted = 0;
1433
1434         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1435
1436         spin_lock(&obd->obd_dev_lock);
1437         /* umount has run already, so evict thread should leave
1438          * its task to umount thread now */
1439         if (obd->obd_stopping) {
1440                 spin_unlock(&obd->obd_dev_lock);
1441                 return exports_evicted;
1442         }
1443         nid_hash = obd->obd_nid_hash;
1444         cfs_hash_getref(nid_hash);
1445         spin_unlock(&obd->obd_dev_lock);
1446
1447         do {
1448                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1449                 if (doomed_exp == NULL)
1450                         break;
1451
1452                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1453                          "nid %s found, wanted nid %s, requested nid %s\n",
1454                          obd_export_nid2str(doomed_exp),
1455                          libcfs_nid2str(nid_key), nid);
1456                 LASSERTF(doomed_exp != obd->obd_self_export,
1457                          "self-export is hashed by NID?\n");
1458                 exports_evicted++;
1459                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1460                               "request\n", obd->obd_name,
1461                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1462                               obd_export_nid2str(doomed_exp));
1463                 class_fail_export(doomed_exp);
1464                 class_export_put(doomed_exp);
1465         } while (1);
1466
1467         cfs_hash_putref(nid_hash);
1468
1469         if (!exports_evicted)
1470                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1471                        obd->obd_name, nid);
1472         return exports_evicted;
1473 }
1474 EXPORT_SYMBOL(obd_export_evict_by_nid);
1475
1476 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1477 {
1478         cfs_hash_t *uuid_hash;
1479         struct obd_export *doomed_exp = NULL;
1480         struct obd_uuid doomed_uuid;
1481         int exports_evicted = 0;
1482
1483         spin_lock(&obd->obd_dev_lock);
1484         if (obd->obd_stopping) {
1485                 spin_unlock(&obd->obd_dev_lock);
1486                 return exports_evicted;
1487         }
1488         uuid_hash = obd->obd_uuid_hash;
1489         cfs_hash_getref(uuid_hash);
1490         spin_unlock(&obd->obd_dev_lock);
1491
1492         obd_str2uuid(&doomed_uuid, uuid);
1493         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1494                 CERROR("%s: can't evict myself\n", obd->obd_name);
1495                 cfs_hash_putref(uuid_hash);
1496                 return exports_evicted;
1497         }
1498
1499         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1500
1501         if (doomed_exp == NULL) {
1502                 CERROR("%s: can't disconnect %s: no exports found\n",
1503                        obd->obd_name, uuid);
1504         } else {
1505                 CWARN("%s: evicting %s at adminstrative request\n",
1506                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1507                 class_fail_export(doomed_exp);
1508                 class_export_put(doomed_exp);
1509                 exports_evicted++;
1510         }
1511         cfs_hash_putref(uuid_hash);
1512
1513         return exports_evicted;
1514 }
1515 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1516
1517 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1518 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1519 EXPORT_SYMBOL(class_export_dump_hook);
1520 #endif
1521
1522 static void print_export_data(struct obd_export *exp, const char *status,
1523                               int locks)
1524 {
1525         struct ptlrpc_reply_state *rs;
1526         struct ptlrpc_reply_state *first_reply = NULL;
1527         int nreplies = 0;
1528
1529         spin_lock(&exp->exp_lock);
1530         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1531                             rs_exp_list) {
1532                 if (nreplies == 0)
1533                         first_reply = rs;
1534                 nreplies++;
1535         }
1536         spin_unlock(&exp->exp_lock);
1537
1538         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1539                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1540                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1541                atomic_read(&exp->exp_rpc_count),
1542                atomic_read(&exp->exp_cb_count),
1543                atomic_read(&exp->exp_locks_count),
1544                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1545                nreplies, first_reply, nreplies > 3 ? "..." : "",
1546                exp->exp_last_committed);
1547 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1548         if (locks && class_export_dump_hook != NULL)
1549                 class_export_dump_hook(exp);
1550 #endif
1551 }
1552
1553 void dump_exports(struct obd_device *obd, int locks)
1554 {
1555         struct obd_export *exp;
1556
1557         spin_lock(&obd->obd_dev_lock);
1558         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1559                 print_export_data(exp, "ACTIVE", locks);
1560         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1561                 print_export_data(exp, "UNLINKED", locks);
1562         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1563                 print_export_data(exp, "DELAYED", locks);
1564         spin_unlock(&obd->obd_dev_lock);
1565         spin_lock(&obd_zombie_impexp_lock);
1566         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1567                 print_export_data(exp, "ZOMBIE", locks);
1568         spin_unlock(&obd_zombie_impexp_lock);
1569 }
1570 EXPORT_SYMBOL(dump_exports);
1571
1572 void obd_exports_barrier(struct obd_device *obd)
1573 {
1574         int waited = 2;
1575         LASSERT(list_empty(&obd->obd_exports));
1576         spin_lock(&obd->obd_dev_lock);
1577         while (!list_empty(&obd->obd_unlinked_exports)) {
1578                 spin_unlock(&obd->obd_dev_lock);
1579                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1580                                                    cfs_time_seconds(waited));
1581                 if (waited > 5 && IS_PO2(waited)) {
1582                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1583                                       "more than %d seconds. "
1584                                       "The obd refcount = %d. Is it stuck?\n",
1585                                       obd->obd_name, waited,
1586                                       atomic_read(&obd->obd_refcount));
1587                         dump_exports(obd, 1);
1588                 }
1589                 waited *= 2;
1590                 spin_lock(&obd->obd_dev_lock);
1591         }
1592         spin_unlock(&obd->obd_dev_lock);
1593 }
1594 EXPORT_SYMBOL(obd_exports_barrier);
1595
1596 /* Total amount of zombies to be destroyed */
1597 static int zombies_count = 0;
1598
1599 /**
1600  * kill zombie imports and exports
1601  */
1602 void obd_zombie_impexp_cull(void)
1603 {
1604         struct obd_import *import;
1605         struct obd_export *export;
1606         ENTRY;
1607
1608         do {
1609                 spin_lock(&obd_zombie_impexp_lock);
1610
1611                 import = NULL;
1612                 if (!list_empty(&obd_zombie_imports)) {
1613                         import = list_entry(obd_zombie_imports.next,
1614                                             struct obd_import,
1615                                             imp_zombie_chain);
1616                         list_del_init(&import->imp_zombie_chain);
1617                 }
1618
1619                 export = NULL;
1620                 if (!list_empty(&obd_zombie_exports)) {
1621                         export = list_entry(obd_zombie_exports.next,
1622                                             struct obd_export,
1623                                             exp_obd_chain);
1624                         list_del_init(&export->exp_obd_chain);
1625                 }
1626
1627                 spin_unlock(&obd_zombie_impexp_lock);
1628
1629                 if (import != NULL) {
1630                         class_import_destroy(import);
1631                         spin_lock(&obd_zombie_impexp_lock);
1632                         zombies_count--;
1633                         spin_unlock(&obd_zombie_impexp_lock);
1634                 }
1635
1636                 if (export != NULL) {
1637                         class_export_destroy(export);
1638                         spin_lock(&obd_zombie_impexp_lock);
1639                         zombies_count--;
1640                         spin_unlock(&obd_zombie_impexp_lock);
1641                 }
1642
1643                 cond_resched();
1644         } while (import != NULL || export != NULL);
1645         EXIT;
1646 }
1647
1648 static struct completion        obd_zombie_start;
1649 static struct completion        obd_zombie_stop;
1650 static unsigned long            obd_zombie_flags;
1651 static wait_queue_head_t        obd_zombie_waitq;
1652 static pid_t                    obd_zombie_pid;
1653
1654 enum {
1655         OBD_ZOMBIE_STOP         = 0x0001,
1656 };
1657
1658 /**
1659  * check for work for kill zombie import/export thread.
1660  */
1661 static int obd_zombie_impexp_check(void *arg)
1662 {
1663         int rc;
1664
1665         spin_lock(&obd_zombie_impexp_lock);
1666         rc = (zombies_count == 0) &&
1667              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1668         spin_unlock(&obd_zombie_impexp_lock);
1669
1670         RETURN(rc);
1671 }
1672
1673 /**
1674  * Add export to the obd_zombe thread and notify it.
1675  */
1676 static void obd_zombie_export_add(struct obd_export *exp) {
1677         spin_lock(&exp->exp_obd->obd_dev_lock);
1678         LASSERT(!list_empty(&exp->exp_obd_chain));
1679         list_del_init(&exp->exp_obd_chain);
1680         spin_unlock(&exp->exp_obd->obd_dev_lock);
1681         spin_lock(&obd_zombie_impexp_lock);
1682         zombies_count++;
1683         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1684         spin_unlock(&obd_zombie_impexp_lock);
1685
1686         obd_zombie_impexp_notify();
1687 }
1688
1689 /**
1690  * Add import to the obd_zombe thread and notify it.
1691  */
1692 static void obd_zombie_import_add(struct obd_import *imp) {
1693         LASSERT(imp->imp_sec == NULL);
1694         LASSERT(imp->imp_rq_pool == NULL);
1695         spin_lock(&obd_zombie_impexp_lock);
1696         LASSERT(list_empty(&imp->imp_zombie_chain));
1697         zombies_count++;
1698         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1699         spin_unlock(&obd_zombie_impexp_lock);
1700
1701         obd_zombie_impexp_notify();
1702 }
1703
1704 /**
1705  * notify import/export destroy thread about new zombie.
1706  */
1707 static void obd_zombie_impexp_notify(void)
1708 {
1709         /*
1710          * Make sure obd_zomebie_impexp_thread get this notification.
1711          * It is possible this signal only get by obd_zombie_barrier, and
1712          * barrier gulps this notification and sleeps away and hangs ensues
1713          */
1714         wake_up_all(&obd_zombie_waitq);
1715 }
1716
1717 /**
1718  * check whether obd_zombie is idle
1719  */
1720 static int obd_zombie_is_idle(void)
1721 {
1722         int rc;
1723
1724         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1725         spin_lock(&obd_zombie_impexp_lock);
1726         rc = (zombies_count == 0);
1727         spin_unlock(&obd_zombie_impexp_lock);
1728         return rc;
1729 }
1730
1731 /**
1732  * wait when obd_zombie import/export queues become empty
1733  */
1734 void obd_zombie_barrier(void)
1735 {
1736         struct l_wait_info lwi = { 0 };
1737
1738         if (obd_zombie_pid == current_pid())
1739                 /* don't wait for myself */
1740                 return;
1741         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1742 }
1743 EXPORT_SYMBOL(obd_zombie_barrier);
1744
1745
1746 /**
1747  * destroy zombie export/import thread.
1748  */
1749 static int obd_zombie_impexp_thread(void *unused)
1750 {
1751         unshare_fs_struct();
1752         complete(&obd_zombie_start);
1753
1754         obd_zombie_pid = current_pid();
1755
1756         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1757                 struct l_wait_info lwi = { 0 };
1758
1759                 l_wait_event(obd_zombie_waitq,
1760                              !obd_zombie_impexp_check(NULL), &lwi);
1761                 obd_zombie_impexp_cull();
1762
1763                 /*
1764                  * Notify obd_zombie_barrier callers that queues
1765                  * may be empty.
1766                  */
1767                 wake_up(&obd_zombie_waitq);
1768         }
1769
1770         complete(&obd_zombie_stop);
1771
1772         RETURN(0);
1773 }
1774
1775
1776 /**
1777  * start destroy zombie import/export thread
1778  */
1779 int obd_zombie_impexp_init(void)
1780 {
1781         struct task_struct *task;
1782
1783         INIT_LIST_HEAD(&obd_zombie_imports);
1784
1785         INIT_LIST_HEAD(&obd_zombie_exports);
1786         spin_lock_init(&obd_zombie_impexp_lock);
1787         init_completion(&obd_zombie_start);
1788         init_completion(&obd_zombie_stop);
1789         init_waitqueue_head(&obd_zombie_waitq);
1790         obd_zombie_pid = 0;
1791
1792         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1793         if (IS_ERR(task))
1794                 RETURN(PTR_ERR(task));
1795
1796         wait_for_completion(&obd_zombie_start);
1797         RETURN(0);
1798 }
1799 /**
1800  * stop destroy zombie import/export thread
1801  */
1802 void obd_zombie_impexp_stop(void)
1803 {
1804         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1805         obd_zombie_impexp_notify();
1806         wait_for_completion(&obd_zombie_stop);
1807 }
1808
1809 /***** Kernel-userspace comm helpers *******/
1810
1811 /* Get length of entire message, including header */
1812 int kuc_len(int payload_len)
1813 {
1814         return sizeof(struct kuc_hdr) + payload_len;
1815 }
1816 EXPORT_SYMBOL(kuc_len);
1817
1818 /* Get a pointer to kuc header, given a ptr to the payload
1819  * @param p Pointer to payload area
1820  * @returns Pointer to kuc header
1821  */
1822 struct kuc_hdr * kuc_ptr(void *p)
1823 {
1824         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1825         LASSERT(lh->kuc_magic == KUC_MAGIC);
1826         return lh;
1827 }
1828 EXPORT_SYMBOL(kuc_ptr);
1829
1830 /* Test if payload is part of kuc message
1831  * @param p Pointer to payload area
1832  * @returns boolean
1833  */
1834 int kuc_ispayload(void *p)
1835 {
1836         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1837
1838         if (kh->kuc_magic == KUC_MAGIC)
1839                 return 1;
1840         else
1841                 return 0;
1842 }
1843 EXPORT_SYMBOL(kuc_ispayload);
1844
1845 /* Alloc space for a message, and fill in header
1846  * @return Pointer to payload area
1847  */
1848 void *kuc_alloc(int payload_len, int transport, int type)
1849 {
1850         struct kuc_hdr *lh;
1851         int len = kuc_len(payload_len);
1852
1853         OBD_ALLOC(lh, len);
1854         if (lh == NULL)
1855                 return ERR_PTR(-ENOMEM);
1856
1857         lh->kuc_magic = KUC_MAGIC;
1858         lh->kuc_transport = transport;
1859         lh->kuc_msgtype = type;
1860         lh->kuc_msglen = len;
1861
1862         return (void *)(lh + 1);
1863 }
1864 EXPORT_SYMBOL(kuc_alloc);
1865
1866 /* Takes pointer to payload area */
1867 inline void kuc_free(void *p, int payload_len)
1868 {
1869         struct kuc_hdr *lh = kuc_ptr(p);
1870         OBD_FREE(lh, kuc_len(payload_len));
1871 }
1872 EXPORT_SYMBOL(kuc_free);
1873
1874 struct obd_request_slot_waiter {
1875         struct list_head        orsw_entry;
1876         wait_queue_head_t       orsw_waitq;
1877         bool                    orsw_signaled;
1878 };
1879
1880 static bool obd_request_slot_avail(struct client_obd *cli,
1881                                    struct obd_request_slot_waiter *orsw)
1882 {
1883         bool avail;
1884
1885         spin_lock(&cli->cl_loi_list_lock);
1886         avail = !!list_empty(&orsw->orsw_entry);
1887         spin_unlock(&cli->cl_loi_list_lock);
1888
1889         return avail;
1890 };
1891
1892 /*
1893  * For network flow control, the RPC sponsor needs to acquire a credit
1894  * before sending the RPC. The credits count for a connection is defined
1895  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1896  * the subsequent RPC sponsors need to wait until others released their
1897  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1898  */
1899 int obd_get_request_slot(struct client_obd *cli)
1900 {
1901         struct obd_request_slot_waiter   orsw;
1902         struct l_wait_info               lwi;
1903         int                              rc;
1904
1905         spin_lock(&cli->cl_loi_list_lock);
1906         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1907                 cli->cl_r_in_flight++;
1908                 spin_unlock(&cli->cl_loi_list_lock);
1909                 return 0;
1910         }
1911
1912         init_waitqueue_head(&orsw.orsw_waitq);
1913         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1914         orsw.orsw_signaled = false;
1915         spin_unlock(&cli->cl_loi_list_lock);
1916
1917         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1918         rc = l_wait_event(orsw.orsw_waitq,
1919                           obd_request_slot_avail(cli, &orsw) ||
1920                           orsw.orsw_signaled,
1921                           &lwi);
1922
1923         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1924          * freed but other (such as obd_put_request_slot) is using it. */
1925         spin_lock(&cli->cl_loi_list_lock);
1926         if (rc != 0) {
1927                 if (!orsw.orsw_signaled) {
1928                         if (list_empty(&orsw.orsw_entry))
1929                                 cli->cl_r_in_flight--;
1930                         else
1931                                 list_del(&orsw.orsw_entry);
1932                 }
1933         }
1934
1935         if (orsw.orsw_signaled) {
1936                 LASSERT(list_empty(&orsw.orsw_entry));
1937
1938                 rc = -EINTR;
1939         }
1940         spin_unlock(&cli->cl_loi_list_lock);
1941
1942         return rc;
1943 }
1944 EXPORT_SYMBOL(obd_get_request_slot);
1945
1946 void obd_put_request_slot(struct client_obd *cli)
1947 {
1948         struct obd_request_slot_waiter *orsw;
1949
1950         spin_lock(&cli->cl_loi_list_lock);
1951         cli->cl_r_in_flight--;
1952
1953         /* If there is free slot, wakeup the first waiter. */
1954         if (!list_empty(&cli->cl_loi_read_list) &&
1955             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1956                 orsw = list_entry(cli->cl_loi_read_list.next,
1957                                   struct obd_request_slot_waiter, orsw_entry);
1958                 list_del_init(&orsw->orsw_entry);
1959                 cli->cl_r_in_flight++;
1960                 wake_up(&orsw->orsw_waitq);
1961         }
1962         spin_unlock(&cli->cl_loi_list_lock);
1963 }
1964 EXPORT_SYMBOL(obd_put_request_slot);
1965
1966 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1967 {
1968         return cli->cl_max_rpcs_in_flight;
1969 }
1970 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1971
1972 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1973 {
1974         struct obd_request_slot_waiter *orsw;
1975         __u32                           old;
1976         int                             diff;
1977         int                             i;
1978
1979         if (max > OBD_MAX_RIF_MAX || max < 1)
1980                 return -ERANGE;
1981
1982         spin_lock(&cli->cl_loi_list_lock);
1983         old = cli->cl_max_rpcs_in_flight;
1984         cli->cl_max_rpcs_in_flight = max;
1985         diff = max - old;
1986
1987         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1988         for (i = 0; i < diff; i++) {
1989                 if (list_empty(&cli->cl_loi_read_list))
1990                         break;
1991
1992                 orsw = list_entry(cli->cl_loi_read_list.next,
1993                                   struct obd_request_slot_waiter, orsw_entry);
1994                 list_del_init(&orsw->orsw_entry);
1995                 cli->cl_r_in_flight++;
1996                 wake_up(&orsw->orsw_waitq);
1997         }
1998         spin_unlock(&cli->cl_loi_list_lock);
1999
2000         return 0;
2001 }
2002 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);