Whamcloud - gitweb
LU-4942 at: per-export lock callback timeout
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
45
46 spinlock_t obd_types_lock;
47
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 struct kmem_cache *import_cachep;
52
53 struct list_head obd_zombie_imports;
54 struct list_head obd_zombie_exports;
55 spinlock_t  obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks);
61
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
64
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
69 static struct obd_device *obd_device_alloc(void)
70 {
71         struct obd_device *obd;
72
73         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74         if (obd != NULL) {
75                 obd->obd_magic = OBD_DEVICE_MAGIC;
76         }
77         return obd;
78 }
79
80 static void obd_device_free(struct obd_device *obd)
81 {
82         LASSERT(obd != NULL);
83         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85         if (obd->obd_namespace != NULL) {
86                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87                        obd, obd->obd_namespace, obd->obd_force);
88                 LBUG();
89         }
90         lu_ref_fini(&obd->obd_reference);
91         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93
94 struct obd_type *class_search_type(const char *name)
95 {
96         struct list_head *tmp;
97         struct obd_type *type;
98
99         spin_lock(&obd_types_lock);
100         list_for_each(tmp, &obd_types) {
101                 type = list_entry(tmp, struct obd_type, typ_chain);
102                 if (strcmp(type->typ_name, name) == 0) {
103                         spin_unlock(&obd_types_lock);
104                         return type;
105                 }
106         }
107         spin_unlock(&obd_types_lock);
108         return NULL;
109 }
110 EXPORT_SYMBOL(class_search_type);
111
112 struct obd_type *class_get_type(const char *name)
113 {
114         struct obd_type *type = class_search_type(name);
115
116 #ifdef HAVE_MODULE_LOADING_SUPPORT
117         if (!type) {
118                 const char *modname = name;
119
120                 if (strcmp(modname, "obdfilter") == 0)
121                         modname = "ofd";
122
123                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124                         modname = LUSTRE_OSP_NAME;
125
126                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127                         modname = LUSTRE_MDT_NAME;
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143         }
144         return type;
145 }
146 EXPORT_SYMBOL(class_get_type);
147
148 void class_put_type(struct obd_type *type)
149 {
150         LASSERT(type);
151         spin_lock(&type->obd_type_lock);
152         type->typ_refcnt--;
153         module_put(type->typ_dt_ops->o_owner);
154         spin_unlock(&type->obd_type_lock);
155 }
156 EXPORT_SYMBOL(class_put_type);
157
158 #define CLASS_MAX_NAME 1024
159
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161                         bool enable_proc, struct lprocfs_seq_vars *vars,
162                         const char *name, struct lu_device_type *ldt)
163 {
164         struct obd_type *type;
165         int rc = 0;
166         ENTRY;
167
168         /* sanity check */
169         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170
171         if (class_search_type(name)) {
172                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173                 RETURN(-EEXIST);
174         }
175
176         rc = -ENOMEM;
177         OBD_ALLOC(type, sizeof(*type));
178         if (type == NULL)
179                 RETURN(rc);
180
181         OBD_ALLOC_PTR(type->typ_dt_ops);
182         OBD_ALLOC_PTR(type->typ_md_ops);
183         OBD_ALLOC(type->typ_name, strlen(name) + 1);
184
185         if (type->typ_dt_ops == NULL ||
186             type->typ_md_ops == NULL ||
187             type->typ_name == NULL)
188                 GOTO (failed, rc);
189
190         *(type->typ_dt_ops) = *dt_ops;
191         /* md_ops is optional */
192         if (md_ops)
193                 *(type->typ_md_ops) = *md_ops;
194         strcpy(type->typ_name, name);
195         spin_lock_init(&type->obd_type_lock);
196
197 #ifdef LPROCFS
198         if (enable_proc) {
199                 type->typ_procroot = lprocfs_seq_register(type->typ_name,
200                                                           proc_lustre_root,
201                                                           vars, type);
202                 if (IS_ERR(type->typ_procroot)) {
203                         rc = PTR_ERR(type->typ_procroot);
204                         type->typ_procroot = NULL;
205                         GOTO(failed, rc);
206                 }
207         }
208 #endif
209         if (ldt != NULL) {
210                 type->typ_lu = ldt;
211                 rc = lu_device_type_init(ldt);
212                 if (rc != 0)
213                         GOTO (failed, rc);
214         }
215
216         spin_lock(&obd_types_lock);
217         list_add(&type->typ_chain, &obd_types);
218         spin_unlock(&obd_types_lock);
219
220         RETURN (0);
221
222 failed:
223         if (type->typ_name != NULL) {
224 #ifdef LPROCFS
225                 if (type->typ_procroot != NULL) {
226 #ifndef HAVE_ONLY_PROCFS_SEQ
227                         lprocfs_try_remove_proc_entry(type->typ_name,
228                                                       proc_lustre_root);
229 #else
230                         remove_proc_subtree(type->typ_name, proc_lustre_root);
231 #endif
232                 }
233 #endif
234                 OBD_FREE(type->typ_name, strlen(name) + 1);
235         }
236         if (type->typ_md_ops != NULL)
237                 OBD_FREE_PTR(type->typ_md_ops);
238         if (type->typ_dt_ops != NULL)
239                 OBD_FREE_PTR(type->typ_dt_ops);
240         OBD_FREE(type, sizeof(*type));
241         RETURN(rc);
242 }
243 EXPORT_SYMBOL(class_register_type);
244
245 int class_unregister_type(const char *name)
246 {
247         struct obd_type *type = class_search_type(name);
248         ENTRY;
249
250         if (!type) {
251                 CERROR("unknown obd type\n");
252                 RETURN(-EINVAL);
253         }
254
255         if (type->typ_refcnt) {
256                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
257                 /* This is a bad situation, let's make the best of it */
258                 /* Remove ops, but leave the name for debugging */
259                 OBD_FREE_PTR(type->typ_dt_ops);
260                 OBD_FREE_PTR(type->typ_md_ops);
261                 RETURN(-EBUSY);
262         }
263
264         /* we do not use type->typ_procroot as for compatibility purposes
265          * other modules can share names (i.e. lod can use lov entry). so
266          * we can't reference pointer as it can get invalided when another
267          * module removes the entry */
268 #ifdef LPROCFS
269         if (type->typ_procroot != NULL) {
270 #ifndef HAVE_ONLY_PROCFS_SEQ
271                 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
272 #else
273                 remove_proc_subtree(type->typ_name, proc_lustre_root);
274 #endif
275         }
276
277         if (type->typ_procsym != NULL)
278                 lprocfs_remove(&type->typ_procsym);
279 #endif
280         if (type->typ_lu)
281                 lu_device_type_fini(type->typ_lu);
282
283         spin_lock(&obd_types_lock);
284         list_del(&type->typ_chain);
285         spin_unlock(&obd_types_lock);
286         OBD_FREE(type->typ_name, strlen(name) + 1);
287         if (type->typ_dt_ops != NULL)
288                 OBD_FREE_PTR(type->typ_dt_ops);
289         if (type->typ_md_ops != NULL)
290                 OBD_FREE_PTR(type->typ_md_ops);
291         OBD_FREE(type, sizeof(*type));
292         RETURN(0);
293 } /* class_unregister_type */
294 EXPORT_SYMBOL(class_unregister_type);
295
296 /**
297  * Create a new obd device.
298  *
299  * Find an empty slot in ::obd_devs[], create a new obd device in it.
300  *
301  * \param[in] type_name obd device type string.
302  * \param[in] name      obd device name.
303  *
304  * \retval NULL if create fails, otherwise return the obd device
305  *         pointer created.
306  */
307 struct obd_device *class_newdev(const char *type_name, const char *name)
308 {
309         struct obd_device *result = NULL;
310         struct obd_device *newdev;
311         struct obd_type *type = NULL;
312         int i;
313         int new_obd_minor = 0;
314         ENTRY;
315
316         if (strlen(name) >= MAX_OBD_NAME) {
317                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
318                 RETURN(ERR_PTR(-EINVAL));
319         }
320
321         type = class_get_type(type_name);
322         if (type == NULL){
323                 CERROR("OBD: unknown type: %s\n", type_name);
324                 RETURN(ERR_PTR(-ENODEV));
325         }
326
327         newdev = obd_device_alloc();
328         if (newdev == NULL)
329                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
330
331         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
332
333         write_lock(&obd_dev_lock);
334         for (i = 0; i < class_devno_max(); i++) {
335                 struct obd_device *obd = class_num2obd(i);
336
337                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
338                         CERROR("Device %s already exists at %d, won't add\n",
339                                name, i);
340                         if (result) {
341                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
342                                          "%p obd_magic %08x != %08x\n", result,
343                                          result->obd_magic, OBD_DEVICE_MAGIC);
344                                 LASSERTF(result->obd_minor == new_obd_minor,
345                                          "%p obd_minor %d != %d\n", result,
346                                          result->obd_minor, new_obd_minor);
347
348                                 obd_devs[result->obd_minor] = NULL;
349                                 result->obd_name[0]='\0';
350                          }
351                         result = ERR_PTR(-EEXIST);
352                         break;
353                 }
354                 if (!result && !obd) {
355                         result = newdev;
356                         result->obd_minor = i;
357                         new_obd_minor = i;
358                         result->obd_type = type;
359                         strncpy(result->obd_name, name,
360                                 sizeof(result->obd_name) - 1);
361                         obd_devs[i] = result;
362                 }
363         }
364         write_unlock(&obd_dev_lock);
365
366         if (result == NULL && i >= class_devno_max()) {
367                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
368                        class_devno_max());
369                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
370         }
371
372         if (IS_ERR(result))
373                 GOTO(out, result);
374
375         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
376                result->obd_name, result);
377
378         RETURN(result);
379 out:
380         obd_device_free(newdev);
381 out_type:
382         class_put_type(type);
383         return result;
384 }
385
386 void class_release_dev(struct obd_device *obd)
387 {
388         struct obd_type *obd_type = obd->obd_type;
389
390         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
391                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
392         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
393                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
394         LASSERT(obd_type != NULL);
395
396         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
397                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
398
399         write_lock(&obd_dev_lock);
400         obd_devs[obd->obd_minor] = NULL;
401         write_unlock(&obd_dev_lock);
402         obd_device_free(obd);
403
404         class_put_type(obd_type);
405 }
406
407 int class_name2dev(const char *name)
408 {
409         int i;
410
411         if (!name)
412                 return -1;
413
414         read_lock(&obd_dev_lock);
415         for (i = 0; i < class_devno_max(); i++) {
416                 struct obd_device *obd = class_num2obd(i);
417
418                 if (obd && strcmp(name, obd->obd_name) == 0) {
419                         /* Make sure we finished attaching before we give
420                            out any references */
421                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422                         if (obd->obd_attached) {
423                                 read_unlock(&obd_dev_lock);
424                                 return i;
425                         }
426                         break;
427                 }
428         }
429         read_unlock(&obd_dev_lock);
430
431         return -1;
432 }
433 EXPORT_SYMBOL(class_name2dev);
434
435 struct obd_device *class_name2obd(const char *name)
436 {
437         int dev = class_name2dev(name);
438
439         if (dev < 0 || dev > class_devno_max())
440                 return NULL;
441         return class_num2obd(dev);
442 }
443 EXPORT_SYMBOL(class_name2obd);
444
445 int class_uuid2dev(struct obd_uuid *uuid)
446 {
447         int i;
448
449         read_lock(&obd_dev_lock);
450         for (i = 0; i < class_devno_max(); i++) {
451                 struct obd_device *obd = class_num2obd(i);
452
453                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
454                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
455                         read_unlock(&obd_dev_lock);
456                         return i;
457                 }
458         }
459         read_unlock(&obd_dev_lock);
460
461         return -1;
462 }
463 EXPORT_SYMBOL(class_uuid2dev);
464
465 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
466 {
467         int dev = class_uuid2dev(uuid);
468         if (dev < 0)
469                 return NULL;
470         return class_num2obd(dev);
471 }
472 EXPORT_SYMBOL(class_uuid2obd);
473
474 /**
475  * Get obd device from ::obd_devs[]
476  *
477  * \param num [in] array index
478  *
479  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
480  *         otherwise return the obd device there.
481  */
482 struct obd_device *class_num2obd(int num)
483 {
484         struct obd_device *obd = NULL;
485
486         if (num < class_devno_max()) {
487                 obd = obd_devs[num];
488                 if (obd == NULL)
489                         return NULL;
490
491                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
492                          "%p obd_magic %08x != %08x\n",
493                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
494                 LASSERTF(obd->obd_minor == num,
495                          "%p obd_minor %0d != %0d\n",
496                          obd, obd->obd_minor, num);
497         }
498
499         return obd;
500 }
501 EXPORT_SYMBOL(class_num2obd);
502
503 /**
504  * Get obd devices count. Device in any
505  *    state are counted
506  * \retval obd device count
507  */
508 int get_devices_count(void)
509 {
510         int index, max_index = class_devno_max(), dev_count = 0;
511
512         read_lock(&obd_dev_lock);
513         for (index = 0; index <= max_index; index++) {
514                 struct obd_device *obd = class_num2obd(index);
515                 if (obd != NULL)
516                         dev_count++;
517         }
518         read_unlock(&obd_dev_lock);
519
520         return dev_count;
521 }
522 EXPORT_SYMBOL(get_devices_count);
523
524 void class_obd_list(void)
525 {
526         char *status;
527         int i;
528
529         read_lock(&obd_dev_lock);
530         for (i = 0; i < class_devno_max(); i++) {
531                 struct obd_device *obd = class_num2obd(i);
532
533                 if (obd == NULL)
534                         continue;
535                 if (obd->obd_stopping)
536                         status = "ST";
537                 else if (obd->obd_set_up)
538                         status = "UP";
539                 else if (obd->obd_attached)
540                         status = "AT";
541                 else
542                         status = "--";
543                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
544                          i, status, obd->obd_type->typ_name,
545                          obd->obd_name, obd->obd_uuid.uuid,
546                          atomic_read(&obd->obd_refcount));
547         }
548         read_unlock(&obd_dev_lock);
549         return;
550 }
551
552 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
553    specified, then only the client with that uuid is returned,
554    otherwise any client connected to the tgt is returned. */
555 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
556                                           const char * typ_name,
557                                           struct obd_uuid *grp_uuid)
558 {
559         int i;
560
561         read_lock(&obd_dev_lock);
562         for (i = 0; i < class_devno_max(); i++) {
563                 struct obd_device *obd = class_num2obd(i);
564
565                 if (obd == NULL)
566                         continue;
567                 if ((strncmp(obd->obd_type->typ_name, typ_name,
568                              strlen(typ_name)) == 0)) {
569                         if (obd_uuid_equals(tgt_uuid,
570                                             &obd->u.cli.cl_target_uuid) &&
571                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
572                                                          &obd->obd_uuid) : 1)) {
573                                 read_unlock(&obd_dev_lock);
574                                 return obd;
575                         }
576                 }
577         }
578         read_unlock(&obd_dev_lock);
579
580         return NULL;
581 }
582 EXPORT_SYMBOL(class_find_client_obd);
583
584 /* Iterate the obd_device list looking devices have grp_uuid. Start
585    searching at *next, and if a device is found, the next index to look
586    at is saved in *next. If next is NULL, then the first matching device
587    will always be returned. */
588 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
589 {
590         int i;
591
592         if (next == NULL)
593                 i = 0;
594         else if (*next >= 0 && *next < class_devno_max())
595                 i = *next;
596         else
597                 return NULL;
598
599         read_lock(&obd_dev_lock);
600         for (; i < class_devno_max(); i++) {
601                 struct obd_device *obd = class_num2obd(i);
602
603                 if (obd == NULL)
604                         continue;
605                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
606                         if (next != NULL)
607                                 *next = i+1;
608                         read_unlock(&obd_dev_lock);
609                         return obd;
610                 }
611         }
612         read_unlock(&obd_dev_lock);
613
614         return NULL;
615 }
616 EXPORT_SYMBOL(class_devices_in_group);
617
618 /**
619  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
620  * adjust sptlrpc settings accordingly.
621  */
622 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
623 {
624         struct obd_device  *obd;
625         const char         *type;
626         int                 i, rc = 0, rc2;
627
628         LASSERT(namelen > 0);
629
630         read_lock(&obd_dev_lock);
631         for (i = 0; i < class_devno_max(); i++) {
632                 obd = class_num2obd(i);
633
634                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
635                         continue;
636
637                 /* only notify mdc, osc, mdt, ost */
638                 type = obd->obd_type->typ_name;
639                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
640                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
641                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
642                     strcmp(type, LUSTRE_OST_NAME) != 0)
643                         continue;
644
645                 if (strncmp(obd->obd_name, fsname, namelen))
646                         continue;
647
648                 class_incref(obd, __FUNCTION__, obd);
649                 read_unlock(&obd_dev_lock);
650                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
651                                          sizeof(KEY_SPTLRPC_CONF),
652                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
653                 rc = rc ? rc : rc2;
654                 class_decref(obd, __FUNCTION__, obd);
655                 read_lock(&obd_dev_lock);
656         }
657         read_unlock(&obd_dev_lock);
658         return rc;
659 }
660 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
661
662 void obd_cleanup_caches(void)
663 {
664         ENTRY;
665         if (obd_device_cachep) {
666                 kmem_cache_destroy(obd_device_cachep);
667                 obd_device_cachep = NULL;
668         }
669         if (obdo_cachep) {
670                 kmem_cache_destroy(obdo_cachep);
671                 obdo_cachep = NULL;
672         }
673         if (import_cachep) {
674                 kmem_cache_destroy(import_cachep);
675                 import_cachep = NULL;
676         }
677         if (capa_cachep) {
678                 kmem_cache_destroy(capa_cachep);
679                 capa_cachep = NULL;
680         }
681         EXIT;
682 }
683
684 int obd_init_caches(void)
685 {
686         int rc;
687         ENTRY;
688
689         LASSERT(obd_device_cachep == NULL);
690         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
691                                               sizeof(struct obd_device),
692                                               0, 0, NULL);
693         if (!obd_device_cachep)
694                 GOTO(out, rc = -ENOMEM);
695
696         LASSERT(obdo_cachep == NULL);
697         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
698                                         0, 0, NULL);
699         if (!obdo_cachep)
700                 GOTO(out, rc = -ENOMEM);
701
702         LASSERT(import_cachep == NULL);
703         import_cachep = kmem_cache_create("ll_import_cache",
704                                           sizeof(struct obd_import),
705                                           0, 0, NULL);
706         if (!import_cachep)
707                 GOTO(out, rc = -ENOMEM);
708
709         LASSERT(capa_cachep == NULL);
710         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
711                                         0, 0, NULL);
712         if (!capa_cachep)
713                 GOTO(out, rc = -ENOMEM);
714
715         RETURN(0);
716 out:
717         obd_cleanup_caches();
718         RETURN(rc);
719 }
720
721 /* map connection to client */
722 struct obd_export *class_conn2export(struct lustre_handle *conn)
723 {
724         struct obd_export *export;
725         ENTRY;
726
727         if (!conn) {
728                 CDEBUG(D_CACHE, "looking for null handle\n");
729                 RETURN(NULL);
730         }
731
732         if (conn->cookie == -1) {  /* this means assign a new connection */
733                 CDEBUG(D_CACHE, "want a new connection\n");
734                 RETURN(NULL);
735         }
736
737         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
738         export = class_handle2object(conn->cookie, NULL);
739         RETURN(export);
740 }
741 EXPORT_SYMBOL(class_conn2export);
742
743 struct obd_device *class_exp2obd(struct obd_export *exp)
744 {
745         if (exp)
746                 return exp->exp_obd;
747         return NULL;
748 }
749 EXPORT_SYMBOL(class_exp2obd);
750
751 struct obd_device *class_conn2obd(struct lustre_handle *conn)
752 {
753         struct obd_export *export;
754         export = class_conn2export(conn);
755         if (export) {
756                 struct obd_device *obd = export->exp_obd;
757                 class_export_put(export);
758                 return obd;
759         }
760         return NULL;
761 }
762 EXPORT_SYMBOL(class_conn2obd);
763
764 struct obd_import *class_exp2cliimp(struct obd_export *exp)
765 {
766         struct obd_device *obd = exp->exp_obd;
767         if (obd == NULL)
768                 return NULL;
769         return obd->u.cli.cl_import;
770 }
771 EXPORT_SYMBOL(class_exp2cliimp);
772
773 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
774 {
775         struct obd_device *obd = class_conn2obd(conn);
776         if (obd == NULL)
777                 return NULL;
778         return obd->u.cli.cl_import;
779 }
780 EXPORT_SYMBOL(class_conn2cliimp);
781
782 /* Export management functions */
783 static void class_export_destroy(struct obd_export *exp)
784 {
785         struct obd_device *obd = exp->exp_obd;
786         ENTRY;
787
788         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
789         LASSERT(obd != NULL);
790
791         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
792                exp->exp_client_uuid.uuid, obd->obd_name);
793
794         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
795         if (exp->exp_connection)
796                 ptlrpc_put_connection_superhack(exp->exp_connection);
797
798         LASSERT(list_empty(&exp->exp_outstanding_replies));
799         LASSERT(list_empty(&exp->exp_uncommitted_replies));
800         LASSERT(list_empty(&exp->exp_req_replay_queue));
801         LASSERT(list_empty(&exp->exp_hp_rpcs));
802         obd_destroy_export(exp);
803         class_decref(obd, "export", exp);
804
805         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
806         EXIT;
807 }
808
809 static void export_handle_addref(void *export)
810 {
811         class_export_get(export);
812 }
813
814 static struct portals_handle_ops export_handle_ops = {
815         .hop_addref = export_handle_addref,
816         .hop_free   = NULL,
817 };
818
819 struct obd_export *class_export_get(struct obd_export *exp)
820 {
821         atomic_inc(&exp->exp_refcount);
822         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
823                atomic_read(&exp->exp_refcount));
824         return exp;
825 }
826 EXPORT_SYMBOL(class_export_get);
827
828 void class_export_put(struct obd_export *exp)
829 {
830         LASSERT(exp != NULL);
831         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
832         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
833                atomic_read(&exp->exp_refcount) - 1);
834
835         if (atomic_dec_and_test(&exp->exp_refcount)) {
836                 LASSERT(!list_empty(&exp->exp_obd_chain));
837                 CDEBUG(D_IOCTL, "final put %p/%s\n",
838                        exp, exp->exp_client_uuid.uuid);
839
840                 /* release nid stat refererence */
841                 lprocfs_exp_cleanup(exp);
842
843                 obd_zombie_export_add(exp);
844         }
845 }
846 EXPORT_SYMBOL(class_export_put);
847
848 /* Creates a new export, adds it to the hash table, and returns a
849  * pointer to it. The refcount is 2: one for the hash reference, and
850  * one for the pointer returned by this function. */
851 struct obd_export *class_new_export(struct obd_device *obd,
852                                     struct obd_uuid *cluuid)
853 {
854         struct obd_export *export;
855         cfs_hash_t *hash = NULL;
856         int rc = 0;
857         ENTRY;
858
859         OBD_ALLOC_PTR(export);
860         if (!export)
861                 return ERR_PTR(-ENOMEM);
862
863         export->exp_conn_cnt = 0;
864         export->exp_lock_hash = NULL;
865         export->exp_flock_hash = NULL;
866         atomic_set(&export->exp_refcount, 2);
867         atomic_set(&export->exp_rpc_count, 0);
868         atomic_set(&export->exp_cb_count, 0);
869         atomic_set(&export->exp_locks_count, 0);
870 #if LUSTRE_TRACKS_LOCK_EXP_REFS
871         INIT_LIST_HEAD(&export->exp_locks_list);
872         spin_lock_init(&export->exp_locks_list_guard);
873 #endif
874         atomic_set(&export->exp_replay_count, 0);
875         export->exp_obd = obd;
876         INIT_LIST_HEAD(&export->exp_outstanding_replies);
877         spin_lock_init(&export->exp_uncommitted_replies_lock);
878         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
879         INIT_LIST_HEAD(&export->exp_req_replay_queue);
880         INIT_LIST_HEAD(&export->exp_handle.h_link);
881         INIT_LIST_HEAD(&export->exp_hp_rpcs);
882         INIT_LIST_HEAD(&export->exp_reg_rpcs);
883         class_handle_hash(&export->exp_handle, &export_handle_ops);
884         export->exp_last_request_time = cfs_time_current_sec();
885         spin_lock_init(&export->exp_lock);
886         spin_lock_init(&export->exp_rpc_lock);
887         INIT_HLIST_NODE(&export->exp_uuid_hash);
888         INIT_HLIST_NODE(&export->exp_nid_hash);
889         spin_lock_init(&export->exp_bl_list_lock);
890         INIT_LIST_HEAD(&export->exp_bl_list);
891
892         export->exp_sp_peer = LUSTRE_SP_ANY;
893         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
894         export->exp_client_uuid = *cluuid;
895         obd_init_export(export);
896
897         spin_lock(&obd->obd_dev_lock);
898         /* shouldn't happen, but might race */
899         if (obd->obd_stopping)
900                 GOTO(exit_unlock, rc = -ENODEV);
901
902         hash = cfs_hash_getref(obd->obd_uuid_hash);
903         if (hash == NULL)
904                 GOTO(exit_unlock, rc = -ENODEV);
905         spin_unlock(&obd->obd_dev_lock);
906
907         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
908                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
909                 if (rc != 0) {
910                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
911                                       obd->obd_name, cluuid->uuid, rc);
912                         GOTO(exit_err, rc = -EALREADY);
913                 }
914         }
915
916         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
917         spin_lock(&obd->obd_dev_lock);
918         if (obd->obd_stopping) {
919                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
920                 GOTO(exit_unlock, rc = -ENODEV);
921         }
922
923         class_incref(obd, "export", export);
924         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
925         list_add_tail(&export->exp_obd_chain_timed,
926                       &export->exp_obd->obd_exports_timed);
927         export->exp_obd->obd_num_exports++;
928         spin_unlock(&obd->obd_dev_lock);
929         cfs_hash_putref(hash);
930         RETURN(export);
931
932 exit_unlock:
933         spin_unlock(&obd->obd_dev_lock);
934 exit_err:
935         if (hash)
936                 cfs_hash_putref(hash);
937         class_handle_unhash(&export->exp_handle);
938         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
939         obd_destroy_export(export);
940         OBD_FREE_PTR(export);
941         return ERR_PTR(rc);
942 }
943 EXPORT_SYMBOL(class_new_export);
944
945 void class_unlink_export(struct obd_export *exp)
946 {
947         class_handle_unhash(&exp->exp_handle);
948
949         spin_lock(&exp->exp_obd->obd_dev_lock);
950         /* delete an uuid-export hashitem from hashtables */
951         if (!hlist_unhashed(&exp->exp_uuid_hash))
952                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
953                              &exp->exp_client_uuid,
954                              &exp->exp_uuid_hash);
955
956         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
957         list_del_init(&exp->exp_obd_chain_timed);
958         exp->exp_obd->obd_num_exports--;
959         spin_unlock(&exp->exp_obd->obd_dev_lock);
960         class_export_put(exp);
961 }
962 EXPORT_SYMBOL(class_unlink_export);
963
964 /* Import management functions */
965 void class_import_destroy(struct obd_import *imp)
966 {
967         ENTRY;
968
969         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
970                 imp->imp_obd->obd_name);
971
972         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
973
974         ptlrpc_put_connection_superhack(imp->imp_connection);
975
976         while (!list_empty(&imp->imp_conn_list)) {
977                 struct obd_import_conn *imp_conn;
978
979                 imp_conn = list_entry(imp->imp_conn_list.next,
980                                       struct obd_import_conn, oic_item);
981                 list_del_init(&imp_conn->oic_item);
982                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
983                 OBD_FREE(imp_conn, sizeof(*imp_conn));
984         }
985
986         LASSERT(imp->imp_sec == NULL);
987         class_decref(imp->imp_obd, "import", imp);
988         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
989         EXIT;
990 }
991
992 static void import_handle_addref(void *import)
993 {
994         class_import_get(import);
995 }
996
997 static struct portals_handle_ops import_handle_ops = {
998         .hop_addref = import_handle_addref,
999         .hop_free   = NULL,
1000 };
1001
1002 struct obd_import *class_import_get(struct obd_import *import)
1003 {
1004         atomic_inc(&import->imp_refcount);
1005         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1006                atomic_read(&import->imp_refcount),
1007                import->imp_obd->obd_name);
1008         return import;
1009 }
1010 EXPORT_SYMBOL(class_import_get);
1011
1012 void class_import_put(struct obd_import *imp)
1013 {
1014         ENTRY;
1015
1016         LASSERT(list_empty(&imp->imp_zombie_chain));
1017         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1018
1019         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1020                atomic_read(&imp->imp_refcount) - 1,
1021                imp->imp_obd->obd_name);
1022
1023         if (atomic_dec_and_test(&imp->imp_refcount)) {
1024                 CDEBUG(D_INFO, "final put import %p\n", imp);
1025                 obd_zombie_import_add(imp);
1026         }
1027
1028         /* catch possible import put race */
1029         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1030         EXIT;
1031 }
1032 EXPORT_SYMBOL(class_import_put);
1033
1034 static void init_imp_at(struct imp_at *at) {
1035         int i;
1036         at_init(&at->iat_net_latency, 0, 0);
1037         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1038                 /* max service estimates are tracked on the server side, so
1039                    don't use the AT history here, just use the last reported
1040                    val. (But keep hist for proc histogram, worst_ever) */
1041                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1042                         AT_FLG_NOHIST);
1043         }
1044 }
1045
1046 struct obd_import *class_new_import(struct obd_device *obd)
1047 {
1048         struct obd_import *imp;
1049
1050         OBD_ALLOC(imp, sizeof(*imp));
1051         if (imp == NULL)
1052                 return NULL;
1053
1054         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1055         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1056         INIT_LIST_HEAD(&imp->imp_replay_list);
1057         INIT_LIST_HEAD(&imp->imp_sending_list);
1058         INIT_LIST_HEAD(&imp->imp_delayed_list);
1059         INIT_LIST_HEAD(&imp->imp_committed_list);
1060         imp->imp_replay_cursor = &imp->imp_committed_list;
1061         spin_lock_init(&imp->imp_lock);
1062         imp->imp_last_success_conn = 0;
1063         imp->imp_state = LUSTRE_IMP_NEW;
1064         imp->imp_obd = class_incref(obd, "import", imp);
1065         mutex_init(&imp->imp_sec_mutex);
1066         init_waitqueue_head(&imp->imp_recovery_waitq);
1067
1068         atomic_set(&imp->imp_refcount, 2);
1069         atomic_set(&imp->imp_unregistering, 0);
1070         atomic_set(&imp->imp_inflight, 0);
1071         atomic_set(&imp->imp_replay_inflight, 0);
1072         atomic_set(&imp->imp_inval_count, 0);
1073         INIT_LIST_HEAD(&imp->imp_conn_list);
1074         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1075         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1076         init_imp_at(&imp->imp_at);
1077
1078         /* the default magic is V2, will be used in connect RPC, and
1079          * then adjusted according to the flags in request/reply. */
1080         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1081
1082         return imp;
1083 }
1084 EXPORT_SYMBOL(class_new_import);
1085
1086 void class_destroy_import(struct obd_import *import)
1087 {
1088         LASSERT(import != NULL);
1089         LASSERT(import != LP_POISON);
1090
1091         class_handle_unhash(&import->imp_handle);
1092
1093         spin_lock(&import->imp_lock);
1094         import->imp_generation++;
1095         spin_unlock(&import->imp_lock);
1096         class_import_put(import);
1097 }
1098 EXPORT_SYMBOL(class_destroy_import);
1099
1100 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1101
1102 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1103 {
1104         spin_lock(&exp->exp_locks_list_guard);
1105
1106         LASSERT(lock->l_exp_refs_nr >= 0);
1107
1108         if (lock->l_exp_refs_target != NULL &&
1109             lock->l_exp_refs_target != exp) {
1110                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1111                               exp, lock, lock->l_exp_refs_target);
1112         }
1113         if ((lock->l_exp_refs_nr ++) == 0) {
1114                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1115                 lock->l_exp_refs_target = exp;
1116         }
1117         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1118                lock, exp, lock->l_exp_refs_nr);
1119         spin_unlock(&exp->exp_locks_list_guard);
1120 }
1121 EXPORT_SYMBOL(__class_export_add_lock_ref);
1122
1123 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1124 {
1125         spin_lock(&exp->exp_locks_list_guard);
1126         LASSERT(lock->l_exp_refs_nr > 0);
1127         if (lock->l_exp_refs_target != exp) {
1128                 LCONSOLE_WARN("lock %p, "
1129                               "mismatching export pointers: %p, %p\n",
1130                               lock, lock->l_exp_refs_target, exp);
1131         }
1132         if (-- lock->l_exp_refs_nr == 0) {
1133                 list_del_init(&lock->l_exp_refs_link);
1134                 lock->l_exp_refs_target = NULL;
1135         }
1136         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1137                lock, exp, lock->l_exp_refs_nr);
1138         spin_unlock(&exp->exp_locks_list_guard);
1139 }
1140 EXPORT_SYMBOL(__class_export_del_lock_ref);
1141 #endif
1142
1143 /* A connection defines an export context in which preallocation can
1144    be managed. This releases the export pointer reference, and returns
1145    the export handle, so the export refcount is 1 when this function
1146    returns. */
1147 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1148                   struct obd_uuid *cluuid)
1149 {
1150         struct obd_export *export;
1151         LASSERT(conn != NULL);
1152         LASSERT(obd != NULL);
1153         LASSERT(cluuid != NULL);
1154         ENTRY;
1155
1156         export = class_new_export(obd, cluuid);
1157         if (IS_ERR(export))
1158                 RETURN(PTR_ERR(export));
1159
1160         conn->cookie = export->exp_handle.h_cookie;
1161         class_export_put(export);
1162
1163         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1164                cluuid->uuid, conn->cookie);
1165         RETURN(0);
1166 }
1167 EXPORT_SYMBOL(class_connect);
1168
1169 /* if export is involved in recovery then clean up related things */
1170 void class_export_recovery_cleanup(struct obd_export *exp)
1171 {
1172         struct obd_device *obd = exp->exp_obd;
1173
1174         spin_lock(&obd->obd_recovery_task_lock);
1175         if (obd->obd_recovering) {
1176                 if (exp->exp_in_recovery) {
1177                         spin_lock(&exp->exp_lock);
1178                         exp->exp_in_recovery = 0;
1179                         spin_unlock(&exp->exp_lock);
1180                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1181                         atomic_dec(&obd->obd_connected_clients);
1182                 }
1183
1184                 /* if called during recovery then should update
1185                  * obd_stale_clients counter,
1186                  * lightweight exports are not counted */
1187                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1188                         exp->exp_obd->obd_stale_clients++;
1189         }
1190         spin_unlock(&obd->obd_recovery_task_lock);
1191
1192         spin_lock(&exp->exp_lock);
1193         /** Cleanup req replay fields */
1194         if (exp->exp_req_replay_needed) {
1195                 exp->exp_req_replay_needed = 0;
1196
1197                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1198                 atomic_dec(&obd->obd_req_replay_clients);
1199         }
1200
1201         /** Cleanup lock replay data */
1202         if (exp->exp_lock_replay_needed) {
1203                 exp->exp_lock_replay_needed = 0;
1204
1205                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1206                 atomic_dec(&obd->obd_lock_replay_clients);
1207         }
1208         spin_unlock(&exp->exp_lock);
1209 }
1210
1211 /* This function removes 1-3 references from the export:
1212  * 1 - for export pointer passed
1213  * and if disconnect really need
1214  * 2 - removing from hash
1215  * 3 - in client_unlink_export
1216  * The export pointer passed to this function can destroyed */
1217 int class_disconnect(struct obd_export *export)
1218 {
1219         int already_disconnected;
1220         ENTRY;
1221
1222         if (export == NULL) {
1223                 CWARN("attempting to free NULL export %p\n", export);
1224                 RETURN(-EINVAL);
1225         }
1226
1227         spin_lock(&export->exp_lock);
1228         already_disconnected = export->exp_disconnected;
1229         export->exp_disconnected = 1;
1230         spin_unlock(&export->exp_lock);
1231
1232         /* class_cleanup(), abort_recovery(), and class_fail_export()
1233          * all end up in here, and if any of them race we shouldn't
1234          * call extra class_export_puts(). */
1235         if (already_disconnected) {
1236                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1237                 GOTO(no_disconn, already_disconnected);
1238         }
1239
1240         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1241                export->exp_handle.h_cookie);
1242
1243         if (!hlist_unhashed(&export->exp_nid_hash))
1244                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1245                              &export->exp_connection->c_peer.nid,
1246                              &export->exp_nid_hash);
1247
1248         class_export_recovery_cleanup(export);
1249         class_unlink_export(export);
1250 no_disconn:
1251         class_export_put(export);
1252         RETURN(0);
1253 }
1254 EXPORT_SYMBOL(class_disconnect);
1255
1256 /* Return non-zero for a fully connected export */
1257 int class_connected_export(struct obd_export *exp)
1258 {
1259         int connected = 0;
1260
1261         if (exp) {
1262                 spin_lock(&exp->exp_lock);
1263                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1264                 spin_unlock(&exp->exp_lock);
1265         }
1266         return connected;
1267 }
1268 EXPORT_SYMBOL(class_connected_export);
1269
1270 static void class_disconnect_export_list(struct list_head *list,
1271                                          enum obd_option flags)
1272 {
1273         int rc;
1274         struct obd_export *exp;
1275         ENTRY;
1276
1277         /* It's possible that an export may disconnect itself, but
1278          * nothing else will be added to this list. */
1279         while (!list_empty(list)) {
1280                 exp = list_entry(list->next, struct obd_export,
1281                                  exp_obd_chain);
1282                 /* need for safe call CDEBUG after obd_disconnect */
1283                 class_export_get(exp);
1284
1285                 spin_lock(&exp->exp_lock);
1286                 exp->exp_flags = flags;
1287                 spin_unlock(&exp->exp_lock);
1288
1289                 if (obd_uuid_equals(&exp->exp_client_uuid,
1290                                     &exp->exp_obd->obd_uuid)) {
1291                         CDEBUG(D_HA,
1292                                "exp %p export uuid == obd uuid, don't discon\n",
1293                                exp);
1294                         /* Need to delete this now so we don't end up pointing
1295                          * to work_list later when this export is cleaned up. */
1296                         list_del_init(&exp->exp_obd_chain);
1297                         class_export_put(exp);
1298                         continue;
1299                 }
1300
1301                 class_export_get(exp);
1302                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1303                        "last request at "CFS_TIME_T"\n",
1304                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1305                        exp, exp->exp_last_request_time);
1306                 /* release one export reference anyway */
1307                 rc = obd_disconnect(exp);
1308
1309                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1310                        obd_export_nid2str(exp), exp, rc);
1311                 class_export_put(exp);
1312         }
1313         EXIT;
1314 }
1315
1316 void class_disconnect_exports(struct obd_device *obd)
1317 {
1318         struct list_head work_list;
1319         ENTRY;
1320
1321         /* Move all of the exports from obd_exports to a work list, en masse. */
1322         INIT_LIST_HEAD(&work_list);
1323         spin_lock(&obd->obd_dev_lock);
1324         list_splice_init(&obd->obd_exports, &work_list);
1325         list_splice_init(&obd->obd_delayed_exports, &work_list);
1326         spin_unlock(&obd->obd_dev_lock);
1327
1328         if (!list_empty(&work_list)) {
1329                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1330                        "disconnecting them\n", obd->obd_minor, obd);
1331                 class_disconnect_export_list(&work_list,
1332                                              exp_flags_from_obd(obd));
1333         } else
1334                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1335                        obd->obd_minor, obd);
1336         EXIT;
1337 }
1338 EXPORT_SYMBOL(class_disconnect_exports);
1339
1340 /* Remove exports that have not completed recovery.
1341  */
1342 void class_disconnect_stale_exports(struct obd_device *obd,
1343                                     int (*test_export)(struct obd_export *))
1344 {
1345         struct list_head work_list;
1346         struct obd_export *exp, *n;
1347         int evicted = 0;
1348         ENTRY;
1349
1350         INIT_LIST_HEAD(&work_list);
1351         spin_lock(&obd->obd_dev_lock);
1352         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1353                                  exp_obd_chain) {
1354                 /* don't count self-export as client */
1355                 if (obd_uuid_equals(&exp->exp_client_uuid,
1356                                     &exp->exp_obd->obd_uuid))
1357                         continue;
1358
1359                 /* don't evict clients which have no slot in last_rcvd
1360                  * (e.g. lightweight connection) */
1361                 if (exp->exp_target_data.ted_lr_idx == -1)
1362                         continue;
1363
1364                 spin_lock(&exp->exp_lock);
1365                 if (exp->exp_failed || test_export(exp)) {
1366                         spin_unlock(&exp->exp_lock);
1367                         continue;
1368                 }
1369                 exp->exp_failed = 1;
1370                 spin_unlock(&exp->exp_lock);
1371
1372                 list_move(&exp->exp_obd_chain, &work_list);
1373                 evicted++;
1374                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1375                        obd->obd_name, exp->exp_client_uuid.uuid,
1376                        exp->exp_connection == NULL ? "<unknown>" :
1377                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1378                 print_export_data(exp, "EVICTING", 0);
1379         }
1380         spin_unlock(&obd->obd_dev_lock);
1381
1382         if (evicted)
1383                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1384                               obd->obd_name, evicted);
1385
1386         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1387                                                  OBD_OPT_ABORT_RECOV);
1388         EXIT;
1389 }
1390 EXPORT_SYMBOL(class_disconnect_stale_exports);
1391
1392 void class_fail_export(struct obd_export *exp)
1393 {
1394         int rc, already_failed;
1395
1396         spin_lock(&exp->exp_lock);
1397         already_failed = exp->exp_failed;
1398         exp->exp_failed = 1;
1399         spin_unlock(&exp->exp_lock);
1400
1401         if (already_failed) {
1402                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1403                        exp, exp->exp_client_uuid.uuid);
1404                 return;
1405         }
1406
1407         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1408                exp, exp->exp_client_uuid.uuid);
1409
1410         if (obd_dump_on_timeout)
1411                 libcfs_debug_dumplog();
1412
1413         /* need for safe call CDEBUG after obd_disconnect */
1414         class_export_get(exp);
1415
1416         /* Most callers into obd_disconnect are removing their own reference
1417          * (request, for example) in addition to the one from the hash table.
1418          * We don't have such a reference here, so make one. */
1419         class_export_get(exp);
1420         rc = obd_disconnect(exp);
1421         if (rc)
1422                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1423         else
1424                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1425                        exp, exp->exp_client_uuid.uuid);
1426         class_export_put(exp);
1427 }
1428 EXPORT_SYMBOL(class_fail_export);
1429
1430 char *obd_export_nid2str(struct obd_export *exp)
1431 {
1432         if (exp->exp_connection != NULL)
1433                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1434
1435         return "(no nid)";
1436 }
1437 EXPORT_SYMBOL(obd_export_nid2str);
1438
1439 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1440 {
1441         cfs_hash_t *nid_hash;
1442         struct obd_export *doomed_exp = NULL;
1443         int exports_evicted = 0;
1444
1445         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1446
1447         spin_lock(&obd->obd_dev_lock);
1448         /* umount has run already, so evict thread should leave
1449          * its task to umount thread now */
1450         if (obd->obd_stopping) {
1451                 spin_unlock(&obd->obd_dev_lock);
1452                 return exports_evicted;
1453         }
1454         nid_hash = obd->obd_nid_hash;
1455         cfs_hash_getref(nid_hash);
1456         spin_unlock(&obd->obd_dev_lock);
1457
1458         do {
1459                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1460                 if (doomed_exp == NULL)
1461                         break;
1462
1463                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1464                          "nid %s found, wanted nid %s, requested nid %s\n",
1465                          obd_export_nid2str(doomed_exp),
1466                          libcfs_nid2str(nid_key), nid);
1467                 LASSERTF(doomed_exp != obd->obd_self_export,
1468                          "self-export is hashed by NID?\n");
1469                 exports_evicted++;
1470                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1471                               "request\n", obd->obd_name,
1472                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1473                               obd_export_nid2str(doomed_exp));
1474                 class_fail_export(doomed_exp);
1475                 class_export_put(doomed_exp);
1476         } while (1);
1477
1478         cfs_hash_putref(nid_hash);
1479
1480         if (!exports_evicted)
1481                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1482                        obd->obd_name, nid);
1483         return exports_evicted;
1484 }
1485 EXPORT_SYMBOL(obd_export_evict_by_nid);
1486
1487 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1488 {
1489         cfs_hash_t *uuid_hash;
1490         struct obd_export *doomed_exp = NULL;
1491         struct obd_uuid doomed_uuid;
1492         int exports_evicted = 0;
1493
1494         spin_lock(&obd->obd_dev_lock);
1495         if (obd->obd_stopping) {
1496                 spin_unlock(&obd->obd_dev_lock);
1497                 return exports_evicted;
1498         }
1499         uuid_hash = obd->obd_uuid_hash;
1500         cfs_hash_getref(uuid_hash);
1501         spin_unlock(&obd->obd_dev_lock);
1502
1503         obd_str2uuid(&doomed_uuid, uuid);
1504         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1505                 CERROR("%s: can't evict myself\n", obd->obd_name);
1506                 cfs_hash_putref(uuid_hash);
1507                 return exports_evicted;
1508         }
1509
1510         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1511
1512         if (doomed_exp == NULL) {
1513                 CERROR("%s: can't disconnect %s: no exports found\n",
1514                        obd->obd_name, uuid);
1515         } else {
1516                 CWARN("%s: evicting %s at adminstrative request\n",
1517                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1518                 class_fail_export(doomed_exp);
1519                 class_export_put(doomed_exp);
1520                 exports_evicted++;
1521         }
1522         cfs_hash_putref(uuid_hash);
1523
1524         return exports_evicted;
1525 }
1526 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1527
1528 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1529 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1530 EXPORT_SYMBOL(class_export_dump_hook);
1531 #endif
1532
1533 static void print_export_data(struct obd_export *exp, const char *status,
1534                               int locks)
1535 {
1536         struct ptlrpc_reply_state *rs;
1537         struct ptlrpc_reply_state *first_reply = NULL;
1538         int nreplies = 0;
1539
1540         spin_lock(&exp->exp_lock);
1541         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1542                             rs_exp_list) {
1543                 if (nreplies == 0)
1544                         first_reply = rs;
1545                 nreplies++;
1546         }
1547         spin_unlock(&exp->exp_lock);
1548
1549         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1550                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1551                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1552                atomic_read(&exp->exp_rpc_count),
1553                atomic_read(&exp->exp_cb_count),
1554                atomic_read(&exp->exp_locks_count),
1555                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1556                nreplies, first_reply, nreplies > 3 ? "..." : "",
1557                exp->exp_last_committed);
1558 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1559         if (locks && class_export_dump_hook != NULL)
1560                 class_export_dump_hook(exp);
1561 #endif
1562 }
1563
1564 void dump_exports(struct obd_device *obd, int locks)
1565 {
1566         struct obd_export *exp;
1567
1568         spin_lock(&obd->obd_dev_lock);
1569         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1570                 print_export_data(exp, "ACTIVE", locks);
1571         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1572                 print_export_data(exp, "UNLINKED", locks);
1573         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1574                 print_export_data(exp, "DELAYED", locks);
1575         spin_unlock(&obd->obd_dev_lock);
1576         spin_lock(&obd_zombie_impexp_lock);
1577         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1578                 print_export_data(exp, "ZOMBIE", locks);
1579         spin_unlock(&obd_zombie_impexp_lock);
1580 }
1581 EXPORT_SYMBOL(dump_exports);
1582
1583 void obd_exports_barrier(struct obd_device *obd)
1584 {
1585         int waited = 2;
1586         LASSERT(list_empty(&obd->obd_exports));
1587         spin_lock(&obd->obd_dev_lock);
1588         while (!list_empty(&obd->obd_unlinked_exports)) {
1589                 spin_unlock(&obd->obd_dev_lock);
1590                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1591                                                    cfs_time_seconds(waited));
1592                 if (waited > 5 && IS_PO2(waited)) {
1593                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1594                                       "more than %d seconds. "
1595                                       "The obd refcount = %d. Is it stuck?\n",
1596                                       obd->obd_name, waited,
1597                                       atomic_read(&obd->obd_refcount));
1598                         dump_exports(obd, 1);
1599                 }
1600                 waited *= 2;
1601                 spin_lock(&obd->obd_dev_lock);
1602         }
1603         spin_unlock(&obd->obd_dev_lock);
1604 }
1605 EXPORT_SYMBOL(obd_exports_barrier);
1606
1607 /* Total amount of zombies to be destroyed */
1608 static int zombies_count = 0;
1609
1610 /**
1611  * kill zombie imports and exports
1612  */
1613 void obd_zombie_impexp_cull(void)
1614 {
1615         struct obd_import *import;
1616         struct obd_export *export;
1617         ENTRY;
1618
1619         do {
1620                 spin_lock(&obd_zombie_impexp_lock);
1621
1622                 import = NULL;
1623                 if (!list_empty(&obd_zombie_imports)) {
1624                         import = list_entry(obd_zombie_imports.next,
1625                                             struct obd_import,
1626                                             imp_zombie_chain);
1627                         list_del_init(&import->imp_zombie_chain);
1628                 }
1629
1630                 export = NULL;
1631                 if (!list_empty(&obd_zombie_exports)) {
1632                         export = list_entry(obd_zombie_exports.next,
1633                                             struct obd_export,
1634                                             exp_obd_chain);
1635                         list_del_init(&export->exp_obd_chain);
1636                 }
1637
1638                 spin_unlock(&obd_zombie_impexp_lock);
1639
1640                 if (import != NULL) {
1641                         class_import_destroy(import);
1642                         spin_lock(&obd_zombie_impexp_lock);
1643                         zombies_count--;
1644                         spin_unlock(&obd_zombie_impexp_lock);
1645                 }
1646
1647                 if (export != NULL) {
1648                         class_export_destroy(export);
1649                         spin_lock(&obd_zombie_impexp_lock);
1650                         zombies_count--;
1651                         spin_unlock(&obd_zombie_impexp_lock);
1652                 }
1653
1654                 cond_resched();
1655         } while (import != NULL || export != NULL);
1656         EXIT;
1657 }
1658
1659 static struct completion        obd_zombie_start;
1660 static struct completion        obd_zombie_stop;
1661 static unsigned long            obd_zombie_flags;
1662 static wait_queue_head_t        obd_zombie_waitq;
1663 static pid_t                    obd_zombie_pid;
1664
1665 enum {
1666         OBD_ZOMBIE_STOP         = 0x0001,
1667 };
1668
1669 /**
1670  * check for work for kill zombie import/export thread.
1671  */
1672 static int obd_zombie_impexp_check(void *arg)
1673 {
1674         int rc;
1675
1676         spin_lock(&obd_zombie_impexp_lock);
1677         rc = (zombies_count == 0) &&
1678              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1679         spin_unlock(&obd_zombie_impexp_lock);
1680
1681         RETURN(rc);
1682 }
1683
1684 /**
1685  * Add export to the obd_zombe thread and notify it.
1686  */
1687 static void obd_zombie_export_add(struct obd_export *exp) {
1688         spin_lock(&exp->exp_obd->obd_dev_lock);
1689         LASSERT(!list_empty(&exp->exp_obd_chain));
1690         list_del_init(&exp->exp_obd_chain);
1691         spin_unlock(&exp->exp_obd->obd_dev_lock);
1692         spin_lock(&obd_zombie_impexp_lock);
1693         zombies_count++;
1694         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1695         spin_unlock(&obd_zombie_impexp_lock);
1696
1697         obd_zombie_impexp_notify();
1698 }
1699
1700 /**
1701  * Add import to the obd_zombe thread and notify it.
1702  */
1703 static void obd_zombie_import_add(struct obd_import *imp) {
1704         LASSERT(imp->imp_sec == NULL);
1705         LASSERT(imp->imp_rq_pool == NULL);
1706         spin_lock(&obd_zombie_impexp_lock);
1707         LASSERT(list_empty(&imp->imp_zombie_chain));
1708         zombies_count++;
1709         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1710         spin_unlock(&obd_zombie_impexp_lock);
1711
1712         obd_zombie_impexp_notify();
1713 }
1714
1715 /**
1716  * notify import/export destroy thread about new zombie.
1717  */
1718 static void obd_zombie_impexp_notify(void)
1719 {
1720         /*
1721          * Make sure obd_zomebie_impexp_thread get this notification.
1722          * It is possible this signal only get by obd_zombie_barrier, and
1723          * barrier gulps this notification and sleeps away and hangs ensues
1724          */
1725         wake_up_all(&obd_zombie_waitq);
1726 }
1727
1728 /**
1729  * check whether obd_zombie is idle
1730  */
1731 static int obd_zombie_is_idle(void)
1732 {
1733         int rc;
1734
1735         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1736         spin_lock(&obd_zombie_impexp_lock);
1737         rc = (zombies_count == 0);
1738         spin_unlock(&obd_zombie_impexp_lock);
1739         return rc;
1740 }
1741
1742 /**
1743  * wait when obd_zombie import/export queues become empty
1744  */
1745 void obd_zombie_barrier(void)
1746 {
1747         struct l_wait_info lwi = { 0 };
1748
1749         if (obd_zombie_pid == current_pid())
1750                 /* don't wait for myself */
1751                 return;
1752         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1753 }
1754 EXPORT_SYMBOL(obd_zombie_barrier);
1755
1756
1757 /**
1758  * destroy zombie export/import thread.
1759  */
1760 static int obd_zombie_impexp_thread(void *unused)
1761 {
1762         unshare_fs_struct();
1763         complete(&obd_zombie_start);
1764
1765         obd_zombie_pid = current_pid();
1766
1767         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1768                 struct l_wait_info lwi = { 0 };
1769
1770                 l_wait_event(obd_zombie_waitq,
1771                              !obd_zombie_impexp_check(NULL), &lwi);
1772                 obd_zombie_impexp_cull();
1773
1774                 /*
1775                  * Notify obd_zombie_barrier callers that queues
1776                  * may be empty.
1777                  */
1778                 wake_up(&obd_zombie_waitq);
1779         }
1780
1781         complete(&obd_zombie_stop);
1782
1783         RETURN(0);
1784 }
1785
1786
1787 /**
1788  * start destroy zombie import/export thread
1789  */
1790 int obd_zombie_impexp_init(void)
1791 {
1792         struct task_struct *task;
1793
1794         INIT_LIST_HEAD(&obd_zombie_imports);
1795
1796         INIT_LIST_HEAD(&obd_zombie_exports);
1797         spin_lock_init(&obd_zombie_impexp_lock);
1798         init_completion(&obd_zombie_start);
1799         init_completion(&obd_zombie_stop);
1800         init_waitqueue_head(&obd_zombie_waitq);
1801         obd_zombie_pid = 0;
1802
1803         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1804         if (IS_ERR(task))
1805                 RETURN(PTR_ERR(task));
1806
1807         wait_for_completion(&obd_zombie_start);
1808         RETURN(0);
1809 }
1810 /**
1811  * stop destroy zombie import/export thread
1812  */
1813 void obd_zombie_impexp_stop(void)
1814 {
1815         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1816         obd_zombie_impexp_notify();
1817         wait_for_completion(&obd_zombie_stop);
1818 }
1819
1820 /***** Kernel-userspace comm helpers *******/
1821
1822 /* Get length of entire message, including header */
1823 int kuc_len(int payload_len)
1824 {
1825         return sizeof(struct kuc_hdr) + payload_len;
1826 }
1827 EXPORT_SYMBOL(kuc_len);
1828
1829 /* Get a pointer to kuc header, given a ptr to the payload
1830  * @param p Pointer to payload area
1831  * @returns Pointer to kuc header
1832  */
1833 struct kuc_hdr * kuc_ptr(void *p)
1834 {
1835         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1836         LASSERT(lh->kuc_magic == KUC_MAGIC);
1837         return lh;
1838 }
1839 EXPORT_SYMBOL(kuc_ptr);
1840
1841 /* Test if payload is part of kuc message
1842  * @param p Pointer to payload area
1843  * @returns boolean
1844  */
1845 int kuc_ispayload(void *p)
1846 {
1847         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1848
1849         if (kh->kuc_magic == KUC_MAGIC)
1850                 return 1;
1851         else
1852                 return 0;
1853 }
1854 EXPORT_SYMBOL(kuc_ispayload);
1855
1856 /* Alloc space for a message, and fill in header
1857  * @return Pointer to payload area
1858  */
1859 void *kuc_alloc(int payload_len, int transport, int type)
1860 {
1861         struct kuc_hdr *lh;
1862         int len = kuc_len(payload_len);
1863
1864         OBD_ALLOC(lh, len);
1865         if (lh == NULL)
1866                 return ERR_PTR(-ENOMEM);
1867
1868         lh->kuc_magic = KUC_MAGIC;
1869         lh->kuc_transport = transport;
1870         lh->kuc_msgtype = type;
1871         lh->kuc_msglen = len;
1872
1873         return (void *)(lh + 1);
1874 }
1875 EXPORT_SYMBOL(kuc_alloc);
1876
1877 /* Takes pointer to payload area */
1878 inline void kuc_free(void *p, int payload_len)
1879 {
1880         struct kuc_hdr *lh = kuc_ptr(p);
1881         OBD_FREE(lh, kuc_len(payload_len));
1882 }
1883 EXPORT_SYMBOL(kuc_free);
1884
1885 struct obd_request_slot_waiter {
1886         struct list_head        orsw_entry;
1887         wait_queue_head_t       orsw_waitq;
1888         bool                    orsw_signaled;
1889 };
1890
1891 static bool obd_request_slot_avail(struct client_obd *cli,
1892                                    struct obd_request_slot_waiter *orsw)
1893 {
1894         bool avail;
1895
1896         client_obd_list_lock(&cli->cl_loi_list_lock);
1897         avail = !!list_empty(&orsw->orsw_entry);
1898         client_obd_list_unlock(&cli->cl_loi_list_lock);
1899
1900         return avail;
1901 };
1902
1903 /*
1904  * For network flow control, the RPC sponsor needs to acquire a credit
1905  * before sending the RPC. The credits count for a connection is defined
1906  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1907  * the subsequent RPC sponsors need to wait until others released their
1908  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1909  */
1910 int obd_get_request_slot(struct client_obd *cli)
1911 {
1912         struct obd_request_slot_waiter   orsw;
1913         struct l_wait_info               lwi;
1914         int                              rc;
1915
1916         client_obd_list_lock(&cli->cl_loi_list_lock);
1917         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1918                 cli->cl_r_in_flight++;
1919                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1920                 return 0;
1921         }
1922
1923         init_waitqueue_head(&orsw.orsw_waitq);
1924         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1925         orsw.orsw_signaled = false;
1926         client_obd_list_unlock(&cli->cl_loi_list_lock);
1927
1928         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1929         rc = l_wait_event(orsw.orsw_waitq,
1930                           obd_request_slot_avail(cli, &orsw) ||
1931                           orsw.orsw_signaled,
1932                           &lwi);
1933
1934         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1935          * freed but other (such as obd_put_request_slot) is using it. */
1936         client_obd_list_lock(&cli->cl_loi_list_lock);
1937         if (rc != 0) {
1938                 if (!orsw.orsw_signaled) {
1939                         if (list_empty(&orsw.orsw_entry))
1940                                 cli->cl_r_in_flight--;
1941                         else
1942                                 list_del(&orsw.orsw_entry);
1943                 }
1944         }
1945
1946         if (orsw.orsw_signaled) {
1947                 LASSERT(list_empty(&orsw.orsw_entry));
1948
1949                 rc = -EINTR;
1950         }
1951         client_obd_list_unlock(&cli->cl_loi_list_lock);
1952
1953         return rc;
1954 }
1955 EXPORT_SYMBOL(obd_get_request_slot);
1956
1957 void obd_put_request_slot(struct client_obd *cli)
1958 {
1959         struct obd_request_slot_waiter *orsw;
1960
1961         client_obd_list_lock(&cli->cl_loi_list_lock);
1962         cli->cl_r_in_flight--;
1963
1964         /* If there is free slot, wakeup the first waiter. */
1965         if (!list_empty(&cli->cl_loi_read_list) &&
1966             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1967                 orsw = list_entry(cli->cl_loi_read_list.next,
1968                                   struct obd_request_slot_waiter, orsw_entry);
1969                 list_del_init(&orsw->orsw_entry);
1970                 cli->cl_r_in_flight++;
1971                 wake_up(&orsw->orsw_waitq);
1972         }
1973         client_obd_list_unlock(&cli->cl_loi_list_lock);
1974 }
1975 EXPORT_SYMBOL(obd_put_request_slot);
1976
1977 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1978 {
1979         return cli->cl_max_rpcs_in_flight;
1980 }
1981 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1982
1983 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1984 {
1985         struct obd_request_slot_waiter *orsw;
1986         __u32                           old;
1987         int                             diff;
1988         int                             i;
1989
1990         if (max > OBD_MAX_RIF_MAX || max < 1)
1991                 return -ERANGE;
1992
1993         client_obd_list_lock(&cli->cl_loi_list_lock);
1994         old = cli->cl_max_rpcs_in_flight;
1995         cli->cl_max_rpcs_in_flight = max;
1996         diff = max - old;
1997
1998         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1999         for (i = 0; i < diff; i++) {
2000                 if (list_empty(&cli->cl_loi_read_list))
2001                         break;
2002
2003                 orsw = list_entry(cli->cl_loi_read_list.next,
2004                                   struct obd_request_slot_waiter, orsw_entry);
2005                 list_del_init(&orsw->orsw_entry);
2006                 cli->cl_r_in_flight++;
2007                 wake_up(&orsw->orsw_waitq);
2008         }
2009         client_obd_list_unlock(&cli->cl_loi_list_lock);
2010
2011         return 0;
2012 }
2013 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);