Whamcloud - gitweb
LU-4788 lfsck: replace cfs_list_t with list_head
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
45
46 spinlock_t obd_types_lock;
47
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 struct kmem_cache *import_cachep;
52
53 struct list_head obd_zombie_imports;
54 struct list_head obd_zombie_exports;
55 spinlock_t  obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks);
61
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
64
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
69 static struct obd_device *obd_device_alloc(void)
70 {
71         struct obd_device *obd;
72
73         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74         if (obd != NULL) {
75                 obd->obd_magic = OBD_DEVICE_MAGIC;
76         }
77         return obd;
78 }
79
80 static void obd_device_free(struct obd_device *obd)
81 {
82         LASSERT(obd != NULL);
83         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85         if (obd->obd_namespace != NULL) {
86                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87                        obd, obd->obd_namespace, obd->obd_force);
88                 LBUG();
89         }
90         lu_ref_fini(&obd->obd_reference);
91         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93
94 struct obd_type *class_search_type(const char *name)
95 {
96         struct list_head *tmp;
97         struct obd_type *type;
98
99         spin_lock(&obd_types_lock);
100         list_for_each(tmp, &obd_types) {
101                 type = list_entry(tmp, struct obd_type, typ_chain);
102                 if (strcmp(type->typ_name, name) == 0) {
103                         spin_unlock(&obd_types_lock);
104                         return type;
105                 }
106         }
107         spin_unlock(&obd_types_lock);
108         return NULL;
109 }
110 EXPORT_SYMBOL(class_search_type);
111
112 struct obd_type *class_get_type(const char *name)
113 {
114         struct obd_type *type = class_search_type(name);
115
116 #ifdef HAVE_MODULE_LOADING_SUPPORT
117         if (!type) {
118                 const char *modname = name;
119
120                 if (strcmp(modname, "obdfilter") == 0)
121                         modname = "ofd";
122
123                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124                         modname = LUSTRE_OSP_NAME;
125
126                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127                         modname = LUSTRE_MDT_NAME;
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143         }
144         return type;
145 }
146 EXPORT_SYMBOL(class_get_type);
147
148 void class_put_type(struct obd_type *type)
149 {
150         LASSERT(type);
151         spin_lock(&type->obd_type_lock);
152         type->typ_refcnt--;
153         module_put(type->typ_dt_ops->o_owner);
154         spin_unlock(&type->obd_type_lock);
155 }
156 EXPORT_SYMBOL(class_put_type);
157
158 #define CLASS_MAX_NAME 1024
159
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161                         bool enable_proc, struct lprocfs_seq_vars *module_vars,
162 #ifndef HAVE_ONLY_PROCFS_SEQ
163                         struct lprocfs_vars *vars,
164 #endif
165                         const char *name, struct lu_device_type *ldt)
166 {
167         struct obd_type *type;
168         int rc = 0;
169         ENTRY;
170
171         /* sanity check */
172         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
173
174         if (class_search_type(name)) {
175                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
176                 RETURN(-EEXIST);
177         }
178
179         rc = -ENOMEM;
180         OBD_ALLOC(type, sizeof(*type));
181         if (type == NULL)
182                 RETURN(rc);
183
184         OBD_ALLOC_PTR(type->typ_dt_ops);
185         OBD_ALLOC_PTR(type->typ_md_ops);
186         OBD_ALLOC(type->typ_name, strlen(name) + 1);
187
188         if (type->typ_dt_ops == NULL ||
189             type->typ_md_ops == NULL ||
190             type->typ_name == NULL)
191                 GOTO (failed, rc);
192
193         *(type->typ_dt_ops) = *dt_ops;
194         /* md_ops is optional */
195         if (md_ops)
196                 *(type->typ_md_ops) = *md_ops;
197         strcpy(type->typ_name, name);
198         spin_lock_init(&type->obd_type_lock);
199
200 #ifdef LPROCFS
201         if (enable_proc) {
202 #ifndef HAVE_ONLY_PROCFS_SEQ
203                 if (vars) {
204                         type->typ_procroot = lprocfs_register(type->typ_name,
205                                                               proc_lustre_root,
206                                                               vars, type);
207                 } else
208 #endif
209                 {
210                         type->typ_procroot = lprocfs_seq_register(type->typ_name,
211                                                                   proc_lustre_root,
212                                                                   module_vars, type);
213                 }
214                 if (IS_ERR(type->typ_procroot)) {
215                         rc = PTR_ERR(type->typ_procroot);
216                         type->typ_procroot = NULL;
217                         GOTO(failed, rc);
218                 }
219         }
220 #endif
221         if (ldt != NULL) {
222                 type->typ_lu = ldt;
223                 rc = lu_device_type_init(ldt);
224                 if (rc != 0)
225                         GOTO (failed, rc);
226         }
227
228         spin_lock(&obd_types_lock);
229         list_add(&type->typ_chain, &obd_types);
230         spin_unlock(&obd_types_lock);
231
232         RETURN (0);
233
234 failed:
235         if (type->typ_name != NULL) {
236 #ifdef LPROCFS
237                 if (type->typ_procroot != NULL) {
238 #ifndef HAVE_ONLY_PROCFS_SEQ
239                         lprocfs_try_remove_proc_entry(type->typ_name,
240                                                       proc_lustre_root);
241 #else
242                         remove_proc_subtree(type->typ_name, proc_lustre_root);
243 #endif
244                 }
245 #endif
246                 OBD_FREE(type->typ_name, strlen(name) + 1);
247         }
248         if (type->typ_md_ops != NULL)
249                 OBD_FREE_PTR(type->typ_md_ops);
250         if (type->typ_dt_ops != NULL)
251                 OBD_FREE_PTR(type->typ_dt_ops);
252         OBD_FREE(type, sizeof(*type));
253         RETURN(rc);
254 }
255 EXPORT_SYMBOL(class_register_type);
256
257 int class_unregister_type(const char *name)
258 {
259         struct obd_type *type = class_search_type(name);
260         ENTRY;
261
262         if (!type) {
263                 CERROR("unknown obd type\n");
264                 RETURN(-EINVAL);
265         }
266
267         if (type->typ_refcnt) {
268                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
269                 /* This is a bad situation, let's make the best of it */
270                 /* Remove ops, but leave the name for debugging */
271                 OBD_FREE_PTR(type->typ_dt_ops);
272                 OBD_FREE_PTR(type->typ_md_ops);
273                 RETURN(-EBUSY);
274         }
275
276         /* we do not use type->typ_procroot as for compatibility purposes
277          * other modules can share names (i.e. lod can use lov entry). so
278          * we can't reference pointer as it can get invalided when another
279          * module removes the entry */
280 #ifdef LPROCFS
281         if (type->typ_procroot != NULL) {
282 #ifndef HAVE_ONLY_PROCFS_SEQ
283                 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
284 #else
285                 remove_proc_subtree(type->typ_name, proc_lustre_root);
286 #endif
287         }
288
289         if (type->typ_procsym != NULL)
290                 lprocfs_remove(&type->typ_procsym);
291 #endif
292         if (type->typ_lu)
293                 lu_device_type_fini(type->typ_lu);
294
295         spin_lock(&obd_types_lock);
296         list_del(&type->typ_chain);
297         spin_unlock(&obd_types_lock);
298         OBD_FREE(type->typ_name, strlen(name) + 1);
299         if (type->typ_dt_ops != NULL)
300                 OBD_FREE_PTR(type->typ_dt_ops);
301         if (type->typ_md_ops != NULL)
302                 OBD_FREE_PTR(type->typ_md_ops);
303         OBD_FREE(type, sizeof(*type));
304         RETURN(0);
305 } /* class_unregister_type */
306 EXPORT_SYMBOL(class_unregister_type);
307
308 /**
309  * Create a new obd device.
310  *
311  * Find an empty slot in ::obd_devs[], create a new obd device in it.
312  *
313  * \param[in] type_name obd device type string.
314  * \param[in] name      obd device name.
315  *
316  * \retval NULL if create fails, otherwise return the obd device
317  *         pointer created.
318  */
319 struct obd_device *class_newdev(const char *type_name, const char *name)
320 {
321         struct obd_device *result = NULL;
322         struct obd_device *newdev;
323         struct obd_type *type = NULL;
324         int i;
325         int new_obd_minor = 0;
326         ENTRY;
327
328         if (strlen(name) >= MAX_OBD_NAME) {
329                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
330                 RETURN(ERR_PTR(-EINVAL));
331         }
332
333         type = class_get_type(type_name);
334         if (type == NULL){
335                 CERROR("OBD: unknown type: %s\n", type_name);
336                 RETURN(ERR_PTR(-ENODEV));
337         }
338
339         newdev = obd_device_alloc();
340         if (newdev == NULL)
341                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
342
343         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
344
345         write_lock(&obd_dev_lock);
346         for (i = 0; i < class_devno_max(); i++) {
347                 struct obd_device *obd = class_num2obd(i);
348
349                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
350                         CERROR("Device %s already exists at %d, won't add\n",
351                                name, i);
352                         if (result) {
353                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
354                                          "%p obd_magic %08x != %08x\n", result,
355                                          result->obd_magic, OBD_DEVICE_MAGIC);
356                                 LASSERTF(result->obd_minor == new_obd_minor,
357                                          "%p obd_minor %d != %d\n", result,
358                                          result->obd_minor, new_obd_minor);
359
360                                 obd_devs[result->obd_minor] = NULL;
361                                 result->obd_name[0]='\0';
362                          }
363                         result = ERR_PTR(-EEXIST);
364                         break;
365                 }
366                 if (!result && !obd) {
367                         result = newdev;
368                         result->obd_minor = i;
369                         new_obd_minor = i;
370                         result->obd_type = type;
371                         strncpy(result->obd_name, name,
372                                 sizeof(result->obd_name) - 1);
373                         obd_devs[i] = result;
374                 }
375         }
376         write_unlock(&obd_dev_lock);
377
378         if (result == NULL && i >= class_devno_max()) {
379                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
380                        class_devno_max());
381                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
382         }
383
384         if (IS_ERR(result))
385                 GOTO(out, result);
386
387         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
388                result->obd_name, result);
389
390         RETURN(result);
391 out:
392         obd_device_free(newdev);
393 out_type:
394         class_put_type(type);
395         return result;
396 }
397
398 void class_release_dev(struct obd_device *obd)
399 {
400         struct obd_type *obd_type = obd->obd_type;
401
402         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
403                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
404         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
405                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
406         LASSERT(obd_type != NULL);
407
408         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
409                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
410
411         write_lock(&obd_dev_lock);
412         obd_devs[obd->obd_minor] = NULL;
413         write_unlock(&obd_dev_lock);
414         obd_device_free(obd);
415
416         class_put_type(obd_type);
417 }
418
419 int class_name2dev(const char *name)
420 {
421         int i;
422
423         if (!name)
424                 return -1;
425
426         read_lock(&obd_dev_lock);
427         for (i = 0; i < class_devno_max(); i++) {
428                 struct obd_device *obd = class_num2obd(i);
429
430                 if (obd && strcmp(name, obd->obd_name) == 0) {
431                         /* Make sure we finished attaching before we give
432                            out any references */
433                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
434                         if (obd->obd_attached) {
435                                 read_unlock(&obd_dev_lock);
436                                 return i;
437                         }
438                         break;
439                 }
440         }
441         read_unlock(&obd_dev_lock);
442
443         return -1;
444 }
445 EXPORT_SYMBOL(class_name2dev);
446
447 struct obd_device *class_name2obd(const char *name)
448 {
449         int dev = class_name2dev(name);
450
451         if (dev < 0 || dev > class_devno_max())
452                 return NULL;
453         return class_num2obd(dev);
454 }
455 EXPORT_SYMBOL(class_name2obd);
456
457 int class_uuid2dev(struct obd_uuid *uuid)
458 {
459         int i;
460
461         read_lock(&obd_dev_lock);
462         for (i = 0; i < class_devno_max(); i++) {
463                 struct obd_device *obd = class_num2obd(i);
464
465                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
466                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
467                         read_unlock(&obd_dev_lock);
468                         return i;
469                 }
470         }
471         read_unlock(&obd_dev_lock);
472
473         return -1;
474 }
475 EXPORT_SYMBOL(class_uuid2dev);
476
477 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
478 {
479         int dev = class_uuid2dev(uuid);
480         if (dev < 0)
481                 return NULL;
482         return class_num2obd(dev);
483 }
484 EXPORT_SYMBOL(class_uuid2obd);
485
486 /**
487  * Get obd device from ::obd_devs[]
488  *
489  * \param num [in] array index
490  *
491  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
492  *         otherwise return the obd device there.
493  */
494 struct obd_device *class_num2obd(int num)
495 {
496         struct obd_device *obd = NULL;
497
498         if (num < class_devno_max()) {
499                 obd = obd_devs[num];
500                 if (obd == NULL)
501                         return NULL;
502
503                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
504                          "%p obd_magic %08x != %08x\n",
505                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
506                 LASSERTF(obd->obd_minor == num,
507                          "%p obd_minor %0d != %0d\n",
508                          obd, obd->obd_minor, num);
509         }
510
511         return obd;
512 }
513 EXPORT_SYMBOL(class_num2obd);
514
515 /**
516  * Get obd devices count. Device in any
517  *    state are counted
518  * \retval obd device count
519  */
520 int get_devices_count(void)
521 {
522         int index, max_index = class_devno_max(), dev_count = 0;
523
524         read_lock(&obd_dev_lock);
525         for (index = 0; index <= max_index; index++) {
526                 struct obd_device *obd = class_num2obd(index);
527                 if (obd != NULL)
528                         dev_count++;
529         }
530         read_unlock(&obd_dev_lock);
531
532         return dev_count;
533 }
534 EXPORT_SYMBOL(get_devices_count);
535
536 void class_obd_list(void)
537 {
538         char *status;
539         int i;
540
541         read_lock(&obd_dev_lock);
542         for (i = 0; i < class_devno_max(); i++) {
543                 struct obd_device *obd = class_num2obd(i);
544
545                 if (obd == NULL)
546                         continue;
547                 if (obd->obd_stopping)
548                         status = "ST";
549                 else if (obd->obd_set_up)
550                         status = "UP";
551                 else if (obd->obd_attached)
552                         status = "AT";
553                 else
554                         status = "--";
555                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
556                          i, status, obd->obd_type->typ_name,
557                          obd->obd_name, obd->obd_uuid.uuid,
558                          atomic_read(&obd->obd_refcount));
559         }
560         read_unlock(&obd_dev_lock);
561         return;
562 }
563
564 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
565    specified, then only the client with that uuid is returned,
566    otherwise any client connected to the tgt is returned. */
567 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
568                                           const char * typ_name,
569                                           struct obd_uuid *grp_uuid)
570 {
571         int i;
572
573         read_lock(&obd_dev_lock);
574         for (i = 0; i < class_devno_max(); i++) {
575                 struct obd_device *obd = class_num2obd(i);
576
577                 if (obd == NULL)
578                         continue;
579                 if ((strncmp(obd->obd_type->typ_name, typ_name,
580                              strlen(typ_name)) == 0)) {
581                         if (obd_uuid_equals(tgt_uuid,
582                                             &obd->u.cli.cl_target_uuid) &&
583                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
584                                                          &obd->obd_uuid) : 1)) {
585                                 read_unlock(&obd_dev_lock);
586                                 return obd;
587                         }
588                 }
589         }
590         read_unlock(&obd_dev_lock);
591
592         return NULL;
593 }
594 EXPORT_SYMBOL(class_find_client_obd);
595
596 /* Iterate the obd_device list looking devices have grp_uuid. Start
597    searching at *next, and if a device is found, the next index to look
598    at is saved in *next. If next is NULL, then the first matching device
599    will always be returned. */
600 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
601 {
602         int i;
603
604         if (next == NULL)
605                 i = 0;
606         else if (*next >= 0 && *next < class_devno_max())
607                 i = *next;
608         else
609                 return NULL;
610
611         read_lock(&obd_dev_lock);
612         for (; i < class_devno_max(); i++) {
613                 struct obd_device *obd = class_num2obd(i);
614
615                 if (obd == NULL)
616                         continue;
617                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
618                         if (next != NULL)
619                                 *next = i+1;
620                         read_unlock(&obd_dev_lock);
621                         return obd;
622                 }
623         }
624         read_unlock(&obd_dev_lock);
625
626         return NULL;
627 }
628 EXPORT_SYMBOL(class_devices_in_group);
629
630 /**
631  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
632  * adjust sptlrpc settings accordingly.
633  */
634 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
635 {
636         struct obd_device  *obd;
637         const char         *type;
638         int                 i, rc = 0, rc2;
639
640         LASSERT(namelen > 0);
641
642         read_lock(&obd_dev_lock);
643         for (i = 0; i < class_devno_max(); i++) {
644                 obd = class_num2obd(i);
645
646                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
647                         continue;
648
649                 /* only notify mdc, osc, mdt, ost */
650                 type = obd->obd_type->typ_name;
651                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
652                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
653                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
654                     strcmp(type, LUSTRE_OST_NAME) != 0)
655                         continue;
656
657                 if (strncmp(obd->obd_name, fsname, namelen))
658                         continue;
659
660                 class_incref(obd, __FUNCTION__, obd);
661                 read_unlock(&obd_dev_lock);
662                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
663                                          sizeof(KEY_SPTLRPC_CONF),
664                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
665                 rc = rc ? rc : rc2;
666                 class_decref(obd, __FUNCTION__, obd);
667                 read_lock(&obd_dev_lock);
668         }
669         read_unlock(&obd_dev_lock);
670         return rc;
671 }
672 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
673
674 void obd_cleanup_caches(void)
675 {
676         ENTRY;
677         if (obd_device_cachep) {
678                 kmem_cache_destroy(obd_device_cachep);
679                 obd_device_cachep = NULL;
680         }
681         if (obdo_cachep) {
682                 kmem_cache_destroy(obdo_cachep);
683                 obdo_cachep = NULL;
684         }
685         if (import_cachep) {
686                 kmem_cache_destroy(import_cachep);
687                 import_cachep = NULL;
688         }
689         if (capa_cachep) {
690                 kmem_cache_destroy(capa_cachep);
691                 capa_cachep = NULL;
692         }
693         EXIT;
694 }
695
696 int obd_init_caches(void)
697 {
698         int rc;
699         ENTRY;
700
701         LASSERT(obd_device_cachep == NULL);
702         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
703                                               sizeof(struct obd_device),
704                                               0, 0, NULL);
705         if (!obd_device_cachep)
706                 GOTO(out, rc = -ENOMEM);
707
708         LASSERT(obdo_cachep == NULL);
709         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
710                                         0, 0, NULL);
711         if (!obdo_cachep)
712                 GOTO(out, rc = -ENOMEM);
713
714         LASSERT(import_cachep == NULL);
715         import_cachep = kmem_cache_create("ll_import_cache",
716                                           sizeof(struct obd_import),
717                                           0, 0, NULL);
718         if (!import_cachep)
719                 GOTO(out, rc = -ENOMEM);
720
721         LASSERT(capa_cachep == NULL);
722         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
723                                         0, 0, NULL);
724         if (!capa_cachep)
725                 GOTO(out, rc = -ENOMEM);
726
727         RETURN(0);
728 out:
729         obd_cleanup_caches();
730         RETURN(rc);
731 }
732
733 /* map connection to client */
734 struct obd_export *class_conn2export(struct lustre_handle *conn)
735 {
736         struct obd_export *export;
737         ENTRY;
738
739         if (!conn) {
740                 CDEBUG(D_CACHE, "looking for null handle\n");
741                 RETURN(NULL);
742         }
743
744         if (conn->cookie == -1) {  /* this means assign a new connection */
745                 CDEBUG(D_CACHE, "want a new connection\n");
746                 RETURN(NULL);
747         }
748
749         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
750         export = class_handle2object(conn->cookie, NULL);
751         RETURN(export);
752 }
753 EXPORT_SYMBOL(class_conn2export);
754
755 struct obd_device *class_exp2obd(struct obd_export *exp)
756 {
757         if (exp)
758                 return exp->exp_obd;
759         return NULL;
760 }
761 EXPORT_SYMBOL(class_exp2obd);
762
763 struct obd_device *class_conn2obd(struct lustre_handle *conn)
764 {
765         struct obd_export *export;
766         export = class_conn2export(conn);
767         if (export) {
768                 struct obd_device *obd = export->exp_obd;
769                 class_export_put(export);
770                 return obd;
771         }
772         return NULL;
773 }
774 EXPORT_SYMBOL(class_conn2obd);
775
776 struct obd_import *class_exp2cliimp(struct obd_export *exp)
777 {
778         struct obd_device *obd = exp->exp_obd;
779         if (obd == NULL)
780                 return NULL;
781         return obd->u.cli.cl_import;
782 }
783 EXPORT_SYMBOL(class_exp2cliimp);
784
785 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
786 {
787         struct obd_device *obd = class_conn2obd(conn);
788         if (obd == NULL)
789                 return NULL;
790         return obd->u.cli.cl_import;
791 }
792 EXPORT_SYMBOL(class_conn2cliimp);
793
794 /* Export management functions */
795 static void class_export_destroy(struct obd_export *exp)
796 {
797         struct obd_device *obd = exp->exp_obd;
798         ENTRY;
799
800         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
801         LASSERT(obd != NULL);
802
803         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
804                exp->exp_client_uuid.uuid, obd->obd_name);
805
806         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
807         if (exp->exp_connection)
808                 ptlrpc_put_connection_superhack(exp->exp_connection);
809
810         LASSERT(list_empty(&exp->exp_outstanding_replies));
811         LASSERT(list_empty(&exp->exp_uncommitted_replies));
812         LASSERT(list_empty(&exp->exp_req_replay_queue));
813         LASSERT(list_empty(&exp->exp_hp_rpcs));
814         obd_destroy_export(exp);
815         class_decref(obd, "export", exp);
816
817         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
818         EXIT;
819 }
820
821 static void export_handle_addref(void *export)
822 {
823         class_export_get(export);
824 }
825
826 static struct portals_handle_ops export_handle_ops = {
827         .hop_addref = export_handle_addref,
828         .hop_free   = NULL,
829 };
830
831 struct obd_export *class_export_get(struct obd_export *exp)
832 {
833         atomic_inc(&exp->exp_refcount);
834         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
835                atomic_read(&exp->exp_refcount));
836         return exp;
837 }
838 EXPORT_SYMBOL(class_export_get);
839
840 void class_export_put(struct obd_export *exp)
841 {
842         LASSERT(exp != NULL);
843         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
844         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
845                atomic_read(&exp->exp_refcount) - 1);
846
847         if (atomic_dec_and_test(&exp->exp_refcount)) {
848                 LASSERT(!list_empty(&exp->exp_obd_chain));
849                 CDEBUG(D_IOCTL, "final put %p/%s\n",
850                        exp, exp->exp_client_uuid.uuid);
851
852                 /* release nid stat refererence */
853                 lprocfs_exp_cleanup(exp);
854
855                 obd_zombie_export_add(exp);
856         }
857 }
858 EXPORT_SYMBOL(class_export_put);
859
860 /* Creates a new export, adds it to the hash table, and returns a
861  * pointer to it. The refcount is 2: one for the hash reference, and
862  * one for the pointer returned by this function. */
863 struct obd_export *class_new_export(struct obd_device *obd,
864                                     struct obd_uuid *cluuid)
865 {
866         struct obd_export *export;
867         cfs_hash_t *hash = NULL;
868         int rc = 0;
869         ENTRY;
870
871         OBD_ALLOC_PTR(export);
872         if (!export)
873                 return ERR_PTR(-ENOMEM);
874
875         export->exp_conn_cnt = 0;
876         export->exp_lock_hash = NULL;
877         export->exp_flock_hash = NULL;
878         atomic_set(&export->exp_refcount, 2);
879         atomic_set(&export->exp_rpc_count, 0);
880         atomic_set(&export->exp_cb_count, 0);
881         atomic_set(&export->exp_locks_count, 0);
882 #if LUSTRE_TRACKS_LOCK_EXP_REFS
883         INIT_LIST_HEAD(&export->exp_locks_list);
884         spin_lock_init(&export->exp_locks_list_guard);
885 #endif
886         atomic_set(&export->exp_replay_count, 0);
887         export->exp_obd = obd;
888         INIT_LIST_HEAD(&export->exp_outstanding_replies);
889         spin_lock_init(&export->exp_uncommitted_replies_lock);
890         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
891         INIT_LIST_HEAD(&export->exp_req_replay_queue);
892         INIT_LIST_HEAD(&export->exp_handle.h_link);
893         INIT_LIST_HEAD(&export->exp_hp_rpcs);
894         INIT_LIST_HEAD(&export->exp_reg_rpcs);
895         class_handle_hash(&export->exp_handle, &export_handle_ops);
896         export->exp_last_request_time = cfs_time_current_sec();
897         spin_lock_init(&export->exp_lock);
898         spin_lock_init(&export->exp_rpc_lock);
899         INIT_HLIST_NODE(&export->exp_uuid_hash);
900         INIT_HLIST_NODE(&export->exp_nid_hash);
901         spin_lock_init(&export->exp_bl_list_lock);
902         INIT_LIST_HEAD(&export->exp_bl_list);
903
904         export->exp_sp_peer = LUSTRE_SP_ANY;
905         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
906         export->exp_client_uuid = *cluuid;
907         obd_init_export(export);
908
909         spin_lock(&obd->obd_dev_lock);
910         /* shouldn't happen, but might race */
911         if (obd->obd_stopping)
912                 GOTO(exit_unlock, rc = -ENODEV);
913
914         hash = cfs_hash_getref(obd->obd_uuid_hash);
915         if (hash == NULL)
916                 GOTO(exit_unlock, rc = -ENODEV);
917         spin_unlock(&obd->obd_dev_lock);
918
919         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
920                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
921                 if (rc != 0) {
922                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
923                                       obd->obd_name, cluuid->uuid, rc);
924                         GOTO(exit_err, rc = -EALREADY);
925                 }
926         }
927
928         spin_lock(&obd->obd_dev_lock);
929         if (obd->obd_stopping) {
930                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
931                 GOTO(exit_unlock, rc = -ENODEV);
932         }
933
934         class_incref(obd, "export", export);
935         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
936         list_add_tail(&export->exp_obd_chain_timed,
937                       &export->exp_obd->obd_exports_timed);
938         export->exp_obd->obd_num_exports++;
939         spin_unlock(&obd->obd_dev_lock);
940         cfs_hash_putref(hash);
941         RETURN(export);
942
943 exit_unlock:
944         spin_unlock(&obd->obd_dev_lock);
945 exit_err:
946         if (hash)
947                 cfs_hash_putref(hash);
948         class_handle_unhash(&export->exp_handle);
949         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
950         obd_destroy_export(export);
951         OBD_FREE_PTR(export);
952         return ERR_PTR(rc);
953 }
954 EXPORT_SYMBOL(class_new_export);
955
956 void class_unlink_export(struct obd_export *exp)
957 {
958         class_handle_unhash(&exp->exp_handle);
959
960         spin_lock(&exp->exp_obd->obd_dev_lock);
961         /* delete an uuid-export hashitem from hashtables */
962         if (!hlist_unhashed(&exp->exp_uuid_hash))
963                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
964                              &exp->exp_client_uuid,
965                              &exp->exp_uuid_hash);
966
967         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
968         list_del_init(&exp->exp_obd_chain_timed);
969         exp->exp_obd->obd_num_exports--;
970         spin_unlock(&exp->exp_obd->obd_dev_lock);
971         class_export_put(exp);
972 }
973 EXPORT_SYMBOL(class_unlink_export);
974
975 /* Import management functions */
976 void class_import_destroy(struct obd_import *imp)
977 {
978         ENTRY;
979
980         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
981                 imp->imp_obd->obd_name);
982
983         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
984
985         ptlrpc_put_connection_superhack(imp->imp_connection);
986
987         while (!list_empty(&imp->imp_conn_list)) {
988                 struct obd_import_conn *imp_conn;
989
990                 imp_conn = list_entry(imp->imp_conn_list.next,
991                                       struct obd_import_conn, oic_item);
992                 list_del_init(&imp_conn->oic_item);
993                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
994                 OBD_FREE(imp_conn, sizeof(*imp_conn));
995         }
996
997         LASSERT(imp->imp_sec == NULL);
998         class_decref(imp->imp_obd, "import", imp);
999         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1000         EXIT;
1001 }
1002
1003 static void import_handle_addref(void *import)
1004 {
1005         class_import_get(import);
1006 }
1007
1008 static struct portals_handle_ops import_handle_ops = {
1009         .hop_addref = import_handle_addref,
1010         .hop_free   = NULL,
1011 };
1012
1013 struct obd_import *class_import_get(struct obd_import *import)
1014 {
1015         atomic_inc(&import->imp_refcount);
1016         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1017                atomic_read(&import->imp_refcount),
1018                import->imp_obd->obd_name);
1019         return import;
1020 }
1021 EXPORT_SYMBOL(class_import_get);
1022
1023 void class_import_put(struct obd_import *imp)
1024 {
1025         ENTRY;
1026
1027         LASSERT(list_empty(&imp->imp_zombie_chain));
1028         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1029
1030         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1031                atomic_read(&imp->imp_refcount) - 1,
1032                imp->imp_obd->obd_name);
1033
1034         if (atomic_dec_and_test(&imp->imp_refcount)) {
1035                 CDEBUG(D_INFO, "final put import %p\n", imp);
1036                 obd_zombie_import_add(imp);
1037         }
1038
1039         /* catch possible import put race */
1040         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1041         EXIT;
1042 }
1043 EXPORT_SYMBOL(class_import_put);
1044
1045 static void init_imp_at(struct imp_at *at) {
1046         int i;
1047         at_init(&at->iat_net_latency, 0, 0);
1048         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1049                 /* max service estimates are tracked on the server side, so
1050                    don't use the AT history here, just use the last reported
1051                    val. (But keep hist for proc histogram, worst_ever) */
1052                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1053                         AT_FLG_NOHIST);
1054         }
1055 }
1056
1057 struct obd_import *class_new_import(struct obd_device *obd)
1058 {
1059         struct obd_import *imp;
1060
1061         OBD_ALLOC(imp, sizeof(*imp));
1062         if (imp == NULL)
1063                 return NULL;
1064
1065         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1066         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1067         INIT_LIST_HEAD(&imp->imp_replay_list);
1068         INIT_LIST_HEAD(&imp->imp_sending_list);
1069         INIT_LIST_HEAD(&imp->imp_delayed_list);
1070         INIT_LIST_HEAD(&imp->imp_committed_list);
1071         imp->imp_replay_cursor = &imp->imp_committed_list;
1072         spin_lock_init(&imp->imp_lock);
1073         imp->imp_last_success_conn = 0;
1074         imp->imp_state = LUSTRE_IMP_NEW;
1075         imp->imp_obd = class_incref(obd, "import", imp);
1076         mutex_init(&imp->imp_sec_mutex);
1077         init_waitqueue_head(&imp->imp_recovery_waitq);
1078
1079         atomic_set(&imp->imp_refcount, 2);
1080         atomic_set(&imp->imp_unregistering, 0);
1081         atomic_set(&imp->imp_inflight, 0);
1082         atomic_set(&imp->imp_replay_inflight, 0);
1083         atomic_set(&imp->imp_inval_count, 0);
1084         INIT_LIST_HEAD(&imp->imp_conn_list);
1085         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1086         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1087         init_imp_at(&imp->imp_at);
1088
1089         /* the default magic is V2, will be used in connect RPC, and
1090          * then adjusted according to the flags in request/reply. */
1091         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1092
1093         return imp;
1094 }
1095 EXPORT_SYMBOL(class_new_import);
1096
1097 void class_destroy_import(struct obd_import *import)
1098 {
1099         LASSERT(import != NULL);
1100         LASSERT(import != LP_POISON);
1101
1102         class_handle_unhash(&import->imp_handle);
1103
1104         spin_lock(&import->imp_lock);
1105         import->imp_generation++;
1106         spin_unlock(&import->imp_lock);
1107         class_import_put(import);
1108 }
1109 EXPORT_SYMBOL(class_destroy_import);
1110
1111 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1112
1113 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1114 {
1115         spin_lock(&exp->exp_locks_list_guard);
1116
1117         LASSERT(lock->l_exp_refs_nr >= 0);
1118
1119         if (lock->l_exp_refs_target != NULL &&
1120             lock->l_exp_refs_target != exp) {
1121                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1122                               exp, lock, lock->l_exp_refs_target);
1123         }
1124         if ((lock->l_exp_refs_nr ++) == 0) {
1125                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1126                 lock->l_exp_refs_target = exp;
1127         }
1128         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1129                lock, exp, lock->l_exp_refs_nr);
1130         spin_unlock(&exp->exp_locks_list_guard);
1131 }
1132 EXPORT_SYMBOL(__class_export_add_lock_ref);
1133
1134 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1135 {
1136         spin_lock(&exp->exp_locks_list_guard);
1137         LASSERT(lock->l_exp_refs_nr > 0);
1138         if (lock->l_exp_refs_target != exp) {
1139                 LCONSOLE_WARN("lock %p, "
1140                               "mismatching export pointers: %p, %p\n",
1141                               lock, lock->l_exp_refs_target, exp);
1142         }
1143         if (-- lock->l_exp_refs_nr == 0) {
1144                 list_del_init(&lock->l_exp_refs_link);
1145                 lock->l_exp_refs_target = NULL;
1146         }
1147         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1148                lock, exp, lock->l_exp_refs_nr);
1149         spin_unlock(&exp->exp_locks_list_guard);
1150 }
1151 EXPORT_SYMBOL(__class_export_del_lock_ref);
1152 #endif
1153
1154 /* A connection defines an export context in which preallocation can
1155    be managed. This releases the export pointer reference, and returns
1156    the export handle, so the export refcount is 1 when this function
1157    returns. */
1158 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1159                   struct obd_uuid *cluuid)
1160 {
1161         struct obd_export *export;
1162         LASSERT(conn != NULL);
1163         LASSERT(obd != NULL);
1164         LASSERT(cluuid != NULL);
1165         ENTRY;
1166
1167         export = class_new_export(obd, cluuid);
1168         if (IS_ERR(export))
1169                 RETURN(PTR_ERR(export));
1170
1171         conn->cookie = export->exp_handle.h_cookie;
1172         class_export_put(export);
1173
1174         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1175                cluuid->uuid, conn->cookie);
1176         RETURN(0);
1177 }
1178 EXPORT_SYMBOL(class_connect);
1179
1180 /* if export is involved in recovery then clean up related things */
1181 void class_export_recovery_cleanup(struct obd_export *exp)
1182 {
1183         struct obd_device *obd = exp->exp_obd;
1184
1185         spin_lock(&obd->obd_recovery_task_lock);
1186         if (obd->obd_recovering) {
1187                 if (exp->exp_in_recovery) {
1188                         spin_lock(&exp->exp_lock);
1189                         exp->exp_in_recovery = 0;
1190                         spin_unlock(&exp->exp_lock);
1191                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1192                         atomic_dec(&obd->obd_connected_clients);
1193                 }
1194
1195                 /* if called during recovery then should update
1196                  * obd_stale_clients counter,
1197                  * lightweight exports are not counted */
1198                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1199                         exp->exp_obd->obd_stale_clients++;
1200         }
1201         spin_unlock(&obd->obd_recovery_task_lock);
1202
1203         spin_lock(&exp->exp_lock);
1204         /** Cleanup req replay fields */
1205         if (exp->exp_req_replay_needed) {
1206                 exp->exp_req_replay_needed = 0;
1207
1208                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1209                 atomic_dec(&obd->obd_req_replay_clients);
1210         }
1211
1212         /** Cleanup lock replay data */
1213         if (exp->exp_lock_replay_needed) {
1214                 exp->exp_lock_replay_needed = 0;
1215
1216                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1217                 atomic_dec(&obd->obd_lock_replay_clients);
1218         }
1219         spin_unlock(&exp->exp_lock);
1220 }
1221
1222 /* This function removes 1-3 references from the export:
1223  * 1 - for export pointer passed
1224  * and if disconnect really need
1225  * 2 - removing from hash
1226  * 3 - in client_unlink_export
1227  * The export pointer passed to this function can destroyed */
1228 int class_disconnect(struct obd_export *export)
1229 {
1230         int already_disconnected;
1231         ENTRY;
1232
1233         if (export == NULL) {
1234                 CWARN("attempting to free NULL export %p\n", export);
1235                 RETURN(-EINVAL);
1236         }
1237
1238         spin_lock(&export->exp_lock);
1239         already_disconnected = export->exp_disconnected;
1240         export->exp_disconnected = 1;
1241         spin_unlock(&export->exp_lock);
1242
1243         /* class_cleanup(), abort_recovery(), and class_fail_export()
1244          * all end up in here, and if any of them race we shouldn't
1245          * call extra class_export_puts(). */
1246         if (already_disconnected) {
1247                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1248                 GOTO(no_disconn, already_disconnected);
1249         }
1250
1251         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1252                export->exp_handle.h_cookie);
1253
1254         if (!hlist_unhashed(&export->exp_nid_hash))
1255                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1256                              &export->exp_connection->c_peer.nid,
1257                              &export->exp_nid_hash);
1258
1259         class_export_recovery_cleanup(export);
1260         class_unlink_export(export);
1261 no_disconn:
1262         class_export_put(export);
1263         RETURN(0);
1264 }
1265 EXPORT_SYMBOL(class_disconnect);
1266
1267 /* Return non-zero for a fully connected export */
1268 int class_connected_export(struct obd_export *exp)
1269 {
1270         int connected = 0;
1271
1272         if (exp) {
1273                 spin_lock(&exp->exp_lock);
1274                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1275                 spin_unlock(&exp->exp_lock);
1276         }
1277         return connected;
1278 }
1279 EXPORT_SYMBOL(class_connected_export);
1280
1281 static void class_disconnect_export_list(struct list_head *list,
1282                                          enum obd_option flags)
1283 {
1284         int rc;
1285         struct obd_export *exp;
1286         ENTRY;
1287
1288         /* It's possible that an export may disconnect itself, but
1289          * nothing else will be added to this list. */
1290         while (!list_empty(list)) {
1291                 exp = list_entry(list->next, struct obd_export,
1292                                  exp_obd_chain);
1293                 /* need for safe call CDEBUG after obd_disconnect */
1294                 class_export_get(exp);
1295
1296                 spin_lock(&exp->exp_lock);
1297                 exp->exp_flags = flags;
1298                 spin_unlock(&exp->exp_lock);
1299
1300                 if (obd_uuid_equals(&exp->exp_client_uuid,
1301                                     &exp->exp_obd->obd_uuid)) {
1302                         CDEBUG(D_HA,
1303                                "exp %p export uuid == obd uuid, don't discon\n",
1304                                exp);
1305                         /* Need to delete this now so we don't end up pointing
1306                          * to work_list later when this export is cleaned up. */
1307                         list_del_init(&exp->exp_obd_chain);
1308                         class_export_put(exp);
1309                         continue;
1310                 }
1311
1312                 class_export_get(exp);
1313                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1314                        "last request at "CFS_TIME_T"\n",
1315                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1316                        exp, exp->exp_last_request_time);
1317                 /* release one export reference anyway */
1318                 rc = obd_disconnect(exp);
1319
1320                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1321                        obd_export_nid2str(exp), exp, rc);
1322                 class_export_put(exp);
1323         }
1324         EXIT;
1325 }
1326
1327 void class_disconnect_exports(struct obd_device *obd)
1328 {
1329         struct list_head work_list;
1330         ENTRY;
1331
1332         /* Move all of the exports from obd_exports to a work list, en masse. */
1333         INIT_LIST_HEAD(&work_list);
1334         spin_lock(&obd->obd_dev_lock);
1335         list_splice_init(&obd->obd_exports, &work_list);
1336         list_splice_init(&obd->obd_delayed_exports, &work_list);
1337         spin_unlock(&obd->obd_dev_lock);
1338
1339         if (!list_empty(&work_list)) {
1340                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1341                        "disconnecting them\n", obd->obd_minor, obd);
1342                 class_disconnect_export_list(&work_list,
1343                                              exp_flags_from_obd(obd));
1344         } else
1345                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1346                        obd->obd_minor, obd);
1347         EXIT;
1348 }
1349 EXPORT_SYMBOL(class_disconnect_exports);
1350
1351 /* Remove exports that have not completed recovery.
1352  */
1353 void class_disconnect_stale_exports(struct obd_device *obd,
1354                                     int (*test_export)(struct obd_export *))
1355 {
1356         struct list_head work_list;
1357         struct obd_export *exp, *n;
1358         int evicted = 0;
1359         ENTRY;
1360
1361         INIT_LIST_HEAD(&work_list);
1362         spin_lock(&obd->obd_dev_lock);
1363         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1364                                  exp_obd_chain) {
1365                 /* don't count self-export as client */
1366                 if (obd_uuid_equals(&exp->exp_client_uuid,
1367                                     &exp->exp_obd->obd_uuid))
1368                         continue;
1369
1370                 /* don't evict clients which have no slot in last_rcvd
1371                  * (e.g. lightweight connection) */
1372                 if (exp->exp_target_data.ted_lr_idx == -1)
1373                         continue;
1374
1375                 spin_lock(&exp->exp_lock);
1376                 if (exp->exp_failed || test_export(exp)) {
1377                         spin_unlock(&exp->exp_lock);
1378                         continue;
1379                 }
1380                 exp->exp_failed = 1;
1381                 spin_unlock(&exp->exp_lock);
1382
1383                 list_move(&exp->exp_obd_chain, &work_list);
1384                 evicted++;
1385                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1386                        obd->obd_name, exp->exp_client_uuid.uuid,
1387                        exp->exp_connection == NULL ? "<unknown>" :
1388                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1389                 print_export_data(exp, "EVICTING", 0);
1390         }
1391         spin_unlock(&obd->obd_dev_lock);
1392
1393         if (evicted)
1394                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1395                               obd->obd_name, evicted);
1396
1397         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1398                                                  OBD_OPT_ABORT_RECOV);
1399         EXIT;
1400 }
1401 EXPORT_SYMBOL(class_disconnect_stale_exports);
1402
1403 void class_fail_export(struct obd_export *exp)
1404 {
1405         int rc, already_failed;
1406
1407         spin_lock(&exp->exp_lock);
1408         already_failed = exp->exp_failed;
1409         exp->exp_failed = 1;
1410         spin_unlock(&exp->exp_lock);
1411
1412         if (already_failed) {
1413                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1414                        exp, exp->exp_client_uuid.uuid);
1415                 return;
1416         }
1417
1418         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1419                exp, exp->exp_client_uuid.uuid);
1420
1421         if (obd_dump_on_timeout)
1422                 libcfs_debug_dumplog();
1423
1424         /* need for safe call CDEBUG after obd_disconnect */
1425         class_export_get(exp);
1426
1427         /* Most callers into obd_disconnect are removing their own reference
1428          * (request, for example) in addition to the one from the hash table.
1429          * We don't have such a reference here, so make one. */
1430         class_export_get(exp);
1431         rc = obd_disconnect(exp);
1432         if (rc)
1433                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1434         else
1435                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1436                        exp, exp->exp_client_uuid.uuid);
1437         class_export_put(exp);
1438 }
1439 EXPORT_SYMBOL(class_fail_export);
1440
1441 char *obd_export_nid2str(struct obd_export *exp)
1442 {
1443         if (exp->exp_connection != NULL)
1444                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1445
1446         return "(no nid)";
1447 }
1448 EXPORT_SYMBOL(obd_export_nid2str);
1449
1450 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1451 {
1452         cfs_hash_t *nid_hash;
1453         struct obd_export *doomed_exp = NULL;
1454         int exports_evicted = 0;
1455
1456         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1457
1458         spin_lock(&obd->obd_dev_lock);
1459         /* umount has run already, so evict thread should leave
1460          * its task to umount thread now */
1461         if (obd->obd_stopping) {
1462                 spin_unlock(&obd->obd_dev_lock);
1463                 return exports_evicted;
1464         }
1465         nid_hash = obd->obd_nid_hash;
1466         cfs_hash_getref(nid_hash);
1467         spin_unlock(&obd->obd_dev_lock);
1468
1469         do {
1470                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1471                 if (doomed_exp == NULL)
1472                         break;
1473
1474                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1475                          "nid %s found, wanted nid %s, requested nid %s\n",
1476                          obd_export_nid2str(doomed_exp),
1477                          libcfs_nid2str(nid_key), nid);
1478                 LASSERTF(doomed_exp != obd->obd_self_export,
1479                          "self-export is hashed by NID?\n");
1480                 exports_evicted++;
1481                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1482                               "request\n", obd->obd_name,
1483                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1484                               obd_export_nid2str(doomed_exp));
1485                 class_fail_export(doomed_exp);
1486                 class_export_put(doomed_exp);
1487         } while (1);
1488
1489         cfs_hash_putref(nid_hash);
1490
1491         if (!exports_evicted)
1492                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1493                        obd->obd_name, nid);
1494         return exports_evicted;
1495 }
1496 EXPORT_SYMBOL(obd_export_evict_by_nid);
1497
1498 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1499 {
1500         cfs_hash_t *uuid_hash;
1501         struct obd_export *doomed_exp = NULL;
1502         struct obd_uuid doomed_uuid;
1503         int exports_evicted = 0;
1504
1505         spin_lock(&obd->obd_dev_lock);
1506         if (obd->obd_stopping) {
1507                 spin_unlock(&obd->obd_dev_lock);
1508                 return exports_evicted;
1509         }
1510         uuid_hash = obd->obd_uuid_hash;
1511         cfs_hash_getref(uuid_hash);
1512         spin_unlock(&obd->obd_dev_lock);
1513
1514         obd_str2uuid(&doomed_uuid, uuid);
1515         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1516                 CERROR("%s: can't evict myself\n", obd->obd_name);
1517                 cfs_hash_putref(uuid_hash);
1518                 return exports_evicted;
1519         }
1520
1521         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1522
1523         if (doomed_exp == NULL) {
1524                 CERROR("%s: can't disconnect %s: no exports found\n",
1525                        obd->obd_name, uuid);
1526         } else {
1527                 CWARN("%s: evicting %s at adminstrative request\n",
1528                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1529                 class_fail_export(doomed_exp);
1530                 class_export_put(doomed_exp);
1531                 exports_evicted++;
1532         }
1533         cfs_hash_putref(uuid_hash);
1534
1535         return exports_evicted;
1536 }
1537 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1538
1539 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1540 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1541 EXPORT_SYMBOL(class_export_dump_hook);
1542 #endif
1543
1544 static void print_export_data(struct obd_export *exp, const char *status,
1545                               int locks)
1546 {
1547         struct ptlrpc_reply_state *rs;
1548         struct ptlrpc_reply_state *first_reply = NULL;
1549         int nreplies = 0;
1550
1551         spin_lock(&exp->exp_lock);
1552         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1553                             rs_exp_list) {
1554                 if (nreplies == 0)
1555                         first_reply = rs;
1556                 nreplies++;
1557         }
1558         spin_unlock(&exp->exp_lock);
1559
1560         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1561                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1562                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1563                atomic_read(&exp->exp_rpc_count),
1564                atomic_read(&exp->exp_cb_count),
1565                atomic_read(&exp->exp_locks_count),
1566                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1567                nreplies, first_reply, nreplies > 3 ? "..." : "",
1568                exp->exp_last_committed);
1569 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1570         if (locks && class_export_dump_hook != NULL)
1571                 class_export_dump_hook(exp);
1572 #endif
1573 }
1574
1575 void dump_exports(struct obd_device *obd, int locks)
1576 {
1577         struct obd_export *exp;
1578
1579         spin_lock(&obd->obd_dev_lock);
1580         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1581                 print_export_data(exp, "ACTIVE", locks);
1582         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1583                 print_export_data(exp, "UNLINKED", locks);
1584         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1585                 print_export_data(exp, "DELAYED", locks);
1586         spin_unlock(&obd->obd_dev_lock);
1587         spin_lock(&obd_zombie_impexp_lock);
1588         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1589                 print_export_data(exp, "ZOMBIE", locks);
1590         spin_unlock(&obd_zombie_impexp_lock);
1591 }
1592 EXPORT_SYMBOL(dump_exports);
1593
1594 void obd_exports_barrier(struct obd_device *obd)
1595 {
1596         int waited = 2;
1597         LASSERT(list_empty(&obd->obd_exports));
1598         spin_lock(&obd->obd_dev_lock);
1599         while (!list_empty(&obd->obd_unlinked_exports)) {
1600                 spin_unlock(&obd->obd_dev_lock);
1601                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1602                                                    cfs_time_seconds(waited));
1603                 if (waited > 5 && IS_PO2(waited)) {
1604                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1605                                       "more than %d seconds. "
1606                                       "The obd refcount = %d. Is it stuck?\n",
1607                                       obd->obd_name, waited,
1608                                       atomic_read(&obd->obd_refcount));
1609                         dump_exports(obd, 1);
1610                 }
1611                 waited *= 2;
1612                 spin_lock(&obd->obd_dev_lock);
1613         }
1614         spin_unlock(&obd->obd_dev_lock);
1615 }
1616 EXPORT_SYMBOL(obd_exports_barrier);
1617
1618 /* Total amount of zombies to be destroyed */
1619 static int zombies_count = 0;
1620
1621 /**
1622  * kill zombie imports and exports
1623  */
1624 void obd_zombie_impexp_cull(void)
1625 {
1626         struct obd_import *import;
1627         struct obd_export *export;
1628         ENTRY;
1629
1630         do {
1631                 spin_lock(&obd_zombie_impexp_lock);
1632
1633                 import = NULL;
1634                 if (!list_empty(&obd_zombie_imports)) {
1635                         import = list_entry(obd_zombie_imports.next,
1636                                             struct obd_import,
1637                                             imp_zombie_chain);
1638                         list_del_init(&import->imp_zombie_chain);
1639                 }
1640
1641                 export = NULL;
1642                 if (!list_empty(&obd_zombie_exports)) {
1643                         export = list_entry(obd_zombie_exports.next,
1644                                             struct obd_export,
1645                                             exp_obd_chain);
1646                         list_del_init(&export->exp_obd_chain);
1647                 }
1648
1649                 spin_unlock(&obd_zombie_impexp_lock);
1650
1651                 if (import != NULL) {
1652                         class_import_destroy(import);
1653                         spin_lock(&obd_zombie_impexp_lock);
1654                         zombies_count--;
1655                         spin_unlock(&obd_zombie_impexp_lock);
1656                 }
1657
1658                 if (export != NULL) {
1659                         class_export_destroy(export);
1660                         spin_lock(&obd_zombie_impexp_lock);
1661                         zombies_count--;
1662                         spin_unlock(&obd_zombie_impexp_lock);
1663                 }
1664
1665                 cond_resched();
1666         } while (import != NULL || export != NULL);
1667         EXIT;
1668 }
1669
1670 static struct completion        obd_zombie_start;
1671 static struct completion        obd_zombie_stop;
1672 static unsigned long            obd_zombie_flags;
1673 static wait_queue_head_t        obd_zombie_waitq;
1674 static pid_t                    obd_zombie_pid;
1675
1676 enum {
1677         OBD_ZOMBIE_STOP         = 0x0001,
1678 };
1679
1680 /**
1681  * check for work for kill zombie import/export thread.
1682  */
1683 static int obd_zombie_impexp_check(void *arg)
1684 {
1685         int rc;
1686
1687         spin_lock(&obd_zombie_impexp_lock);
1688         rc = (zombies_count == 0) &&
1689              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1690         spin_unlock(&obd_zombie_impexp_lock);
1691
1692         RETURN(rc);
1693 }
1694
1695 /**
1696  * Add export to the obd_zombe thread and notify it.
1697  */
1698 static void obd_zombie_export_add(struct obd_export *exp) {
1699         spin_lock(&exp->exp_obd->obd_dev_lock);
1700         LASSERT(!list_empty(&exp->exp_obd_chain));
1701         list_del_init(&exp->exp_obd_chain);
1702         spin_unlock(&exp->exp_obd->obd_dev_lock);
1703         spin_lock(&obd_zombie_impexp_lock);
1704         zombies_count++;
1705         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1706         spin_unlock(&obd_zombie_impexp_lock);
1707
1708         obd_zombie_impexp_notify();
1709 }
1710
1711 /**
1712  * Add import to the obd_zombe thread and notify it.
1713  */
1714 static void obd_zombie_import_add(struct obd_import *imp) {
1715         LASSERT(imp->imp_sec == NULL);
1716         LASSERT(imp->imp_rq_pool == NULL);
1717         spin_lock(&obd_zombie_impexp_lock);
1718         LASSERT(list_empty(&imp->imp_zombie_chain));
1719         zombies_count++;
1720         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1721         spin_unlock(&obd_zombie_impexp_lock);
1722
1723         obd_zombie_impexp_notify();
1724 }
1725
1726 /**
1727  * notify import/export destroy thread about new zombie.
1728  */
1729 static void obd_zombie_impexp_notify(void)
1730 {
1731         /*
1732          * Make sure obd_zomebie_impexp_thread get this notification.
1733          * It is possible this signal only get by obd_zombie_barrier, and
1734          * barrier gulps this notification and sleeps away and hangs ensues
1735          */
1736         wake_up_all(&obd_zombie_waitq);
1737 }
1738
1739 /**
1740  * check whether obd_zombie is idle
1741  */
1742 static int obd_zombie_is_idle(void)
1743 {
1744         int rc;
1745
1746         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1747         spin_lock(&obd_zombie_impexp_lock);
1748         rc = (zombies_count == 0);
1749         spin_unlock(&obd_zombie_impexp_lock);
1750         return rc;
1751 }
1752
1753 /**
1754  * wait when obd_zombie import/export queues become empty
1755  */
1756 void obd_zombie_barrier(void)
1757 {
1758         struct l_wait_info lwi = { 0 };
1759
1760         if (obd_zombie_pid == current_pid())
1761                 /* don't wait for myself */
1762                 return;
1763         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1764 }
1765 EXPORT_SYMBOL(obd_zombie_barrier);
1766
1767
1768 /**
1769  * destroy zombie export/import thread.
1770  */
1771 static int obd_zombie_impexp_thread(void *unused)
1772 {
1773         unshare_fs_struct();
1774         complete(&obd_zombie_start);
1775
1776         obd_zombie_pid = current_pid();
1777
1778         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1779                 struct l_wait_info lwi = { 0 };
1780
1781                 l_wait_event(obd_zombie_waitq,
1782                              !obd_zombie_impexp_check(NULL), &lwi);
1783                 obd_zombie_impexp_cull();
1784
1785                 /*
1786                  * Notify obd_zombie_barrier callers that queues
1787                  * may be empty.
1788                  */
1789                 wake_up(&obd_zombie_waitq);
1790         }
1791
1792         complete(&obd_zombie_stop);
1793
1794         RETURN(0);
1795 }
1796
1797
1798 /**
1799  * start destroy zombie import/export thread
1800  */
1801 int obd_zombie_impexp_init(void)
1802 {
1803         struct task_struct *task;
1804
1805         INIT_LIST_HEAD(&obd_zombie_imports);
1806
1807         INIT_LIST_HEAD(&obd_zombie_exports);
1808         spin_lock_init(&obd_zombie_impexp_lock);
1809         init_completion(&obd_zombie_start);
1810         init_completion(&obd_zombie_stop);
1811         init_waitqueue_head(&obd_zombie_waitq);
1812         obd_zombie_pid = 0;
1813
1814         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1815         if (IS_ERR(task))
1816                 RETURN(PTR_ERR(task));
1817
1818         wait_for_completion(&obd_zombie_start);
1819         RETURN(0);
1820 }
1821 /**
1822  * stop destroy zombie import/export thread
1823  */
1824 void obd_zombie_impexp_stop(void)
1825 {
1826         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1827         obd_zombie_impexp_notify();
1828         wait_for_completion(&obd_zombie_stop);
1829 }
1830
1831 /***** Kernel-userspace comm helpers *******/
1832
1833 /* Get length of entire message, including header */
1834 int kuc_len(int payload_len)
1835 {
1836         return sizeof(struct kuc_hdr) + payload_len;
1837 }
1838 EXPORT_SYMBOL(kuc_len);
1839
1840 /* Get a pointer to kuc header, given a ptr to the payload
1841  * @param p Pointer to payload area
1842  * @returns Pointer to kuc header
1843  */
1844 struct kuc_hdr * kuc_ptr(void *p)
1845 {
1846         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1847         LASSERT(lh->kuc_magic == KUC_MAGIC);
1848         return lh;
1849 }
1850 EXPORT_SYMBOL(kuc_ptr);
1851
1852 /* Test if payload is part of kuc message
1853  * @param p Pointer to payload area
1854  * @returns boolean
1855  */
1856 int kuc_ispayload(void *p)
1857 {
1858         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1859
1860         if (kh->kuc_magic == KUC_MAGIC)
1861                 return 1;
1862         else
1863                 return 0;
1864 }
1865 EXPORT_SYMBOL(kuc_ispayload);
1866
1867 /* Alloc space for a message, and fill in header
1868  * @return Pointer to payload area
1869  */
1870 void *kuc_alloc(int payload_len, int transport, int type)
1871 {
1872         struct kuc_hdr *lh;
1873         int len = kuc_len(payload_len);
1874
1875         OBD_ALLOC(lh, len);
1876         if (lh == NULL)
1877                 return ERR_PTR(-ENOMEM);
1878
1879         lh->kuc_magic = KUC_MAGIC;
1880         lh->kuc_transport = transport;
1881         lh->kuc_msgtype = type;
1882         lh->kuc_msglen = len;
1883
1884         return (void *)(lh + 1);
1885 }
1886 EXPORT_SYMBOL(kuc_alloc);
1887
1888 /* Takes pointer to payload area */
1889 inline void kuc_free(void *p, int payload_len)
1890 {
1891         struct kuc_hdr *lh = kuc_ptr(p);
1892         OBD_FREE(lh, kuc_len(payload_len));
1893 }
1894 EXPORT_SYMBOL(kuc_free);
1895
1896 struct obd_request_slot_waiter {
1897         struct list_head        orsw_entry;
1898         wait_queue_head_t       orsw_waitq;
1899         bool                    orsw_signaled;
1900 };
1901
1902 static bool obd_request_slot_avail(struct client_obd *cli,
1903                                    struct obd_request_slot_waiter *orsw)
1904 {
1905         bool avail;
1906
1907         client_obd_list_lock(&cli->cl_loi_list_lock);
1908         avail = !!list_empty(&orsw->orsw_entry);
1909         client_obd_list_unlock(&cli->cl_loi_list_lock);
1910
1911         return avail;
1912 };
1913
1914 /*
1915  * For network flow control, the RPC sponsor needs to acquire a credit
1916  * before sending the RPC. The credits count for a connection is defined
1917  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1918  * the subsequent RPC sponsors need to wait until others released their
1919  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1920  */
1921 int obd_get_request_slot(struct client_obd *cli)
1922 {
1923         struct obd_request_slot_waiter   orsw;
1924         struct l_wait_info               lwi;
1925         int                              rc;
1926
1927         client_obd_list_lock(&cli->cl_loi_list_lock);
1928         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1929                 cli->cl_r_in_flight++;
1930                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1931                 return 0;
1932         }
1933
1934         init_waitqueue_head(&orsw.orsw_waitq);
1935         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1936         orsw.orsw_signaled = false;
1937         client_obd_list_unlock(&cli->cl_loi_list_lock);
1938
1939         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1940         rc = l_wait_event(orsw.orsw_waitq,
1941                           obd_request_slot_avail(cli, &orsw) ||
1942                           orsw.orsw_signaled,
1943                           &lwi);
1944
1945         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1946          * freed but other (such as obd_put_request_slot) is using it. */
1947         client_obd_list_lock(&cli->cl_loi_list_lock);
1948         if (rc != 0) {
1949                 if (!orsw.orsw_signaled) {
1950                         if (list_empty(&orsw.orsw_entry))
1951                                 cli->cl_r_in_flight--;
1952                         else
1953                                 list_del(&orsw.orsw_entry);
1954                 }
1955         }
1956
1957         if (orsw.orsw_signaled) {
1958                 LASSERT(list_empty(&orsw.orsw_entry));
1959
1960                 rc = -EINTR;
1961         }
1962         client_obd_list_unlock(&cli->cl_loi_list_lock);
1963
1964         return rc;
1965 }
1966 EXPORT_SYMBOL(obd_get_request_slot);
1967
1968 void obd_put_request_slot(struct client_obd *cli)
1969 {
1970         struct obd_request_slot_waiter *orsw;
1971
1972         client_obd_list_lock(&cli->cl_loi_list_lock);
1973         cli->cl_r_in_flight--;
1974
1975         /* If there is free slot, wakeup the first waiter. */
1976         if (!list_empty(&cli->cl_loi_read_list) &&
1977             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1978                 orsw = list_entry(cli->cl_loi_read_list.next,
1979                                   struct obd_request_slot_waiter, orsw_entry);
1980                 list_del_init(&orsw->orsw_entry);
1981                 cli->cl_r_in_flight++;
1982                 wake_up(&orsw->orsw_waitq);
1983         }
1984         client_obd_list_unlock(&cli->cl_loi_list_lock);
1985 }
1986 EXPORT_SYMBOL(obd_put_request_slot);
1987
1988 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1989 {
1990         return cli->cl_max_rpcs_in_flight;
1991 }
1992 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1993
1994 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1995 {
1996         struct obd_request_slot_waiter *orsw;
1997         __u32                           old;
1998         int                             diff;
1999         int                             i;
2000
2001         if (max > OBD_MAX_RIF_MAX || max < 1)
2002                 return -ERANGE;
2003
2004         client_obd_list_lock(&cli->cl_loi_list_lock);
2005         old = cli->cl_max_rpcs_in_flight;
2006         cli->cl_max_rpcs_in_flight = max;
2007         diff = max - old;
2008
2009         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2010         for (i = 0; i < diff; i++) {
2011                 if (list_empty(&cli->cl_loi_read_list))
2012                         break;
2013
2014                 orsw = list_entry(cli->cl_loi_read_list.next,
2015                                   struct obd_request_slot_waiter, orsw_entry);
2016                 list_del_init(&orsw->orsw_entry);
2017                 cli->cl_r_in_flight++;
2018                 wake_up(&orsw->orsw_waitq);
2019         }
2020         client_obd_list_unlock(&cli->cl_loi_list_lock);
2021
2022         return 0;
2023 }
2024 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);