Whamcloud - gitweb
37757a87de91899812c180c1d33d97a770b914bf
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
48
49 spinlock_t obd_types_lock;
50
51 struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 struct kmem_cache *import_cachep;
55
56 struct list_head obd_zombie_imports;
57 struct list_head obd_zombie_exports;
58 spinlock_t  obd_zombie_impexp_lock;
59 static void obd_zombie_impexp_notify(void);
60 static void obd_zombie_export_add(struct obd_export *exp);
61 static void obd_zombie_import_add(struct obd_import *imp);
62 static void print_export_data(struct obd_export *exp,
63                               const char *status, int locks);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct list_head *tmp;
100         struct obd_type *type;
101
102         spin_lock(&obd_types_lock);
103         list_for_each(tmp, &obd_types) {
104                 type = list_entry(tmp, struct obd_type, typ_chain);
105                 if (strcmp(type->typ_name, name) == 0) {
106                         spin_unlock(&obd_types_lock);
107                         return type;
108                 }
109         }
110         spin_unlock(&obd_types_lock);
111         return NULL;
112 }
113 EXPORT_SYMBOL(class_search_type);
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
120         if (!type) {
121                 const char *modname = name;
122
123                 if (strcmp(modname, "obdfilter") == 0)
124                         modname = "ofd";
125
126                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
127                         modname = LUSTRE_OSP_NAME;
128
129                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
130                         modname = LUSTRE_MDT_NAME;
131
132                 if (!request_module("%s", modname)) {
133                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
134                         type = class_search_type(name);
135                 } else {
136                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
137                                            modname);
138                 }
139         }
140 #endif
141         if (type) {
142                 spin_lock(&type->obd_type_lock);
143                 type->typ_refcnt++;
144                 try_module_get(type->typ_dt_ops->o_owner);
145                 spin_unlock(&type->obd_type_lock);
146         }
147         return type;
148 }
149 EXPORT_SYMBOL(class_get_type);
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         spin_lock(&type->obd_type_lock);
155         type->typ_refcnt--;
156         module_put(type->typ_dt_ops->o_owner);
157         spin_unlock(&type->obd_type_lock);
158 }
159 EXPORT_SYMBOL(class_put_type);
160
161 #define CLASS_MAX_NAME 1024
162
163 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
164                         bool enable_proc, struct lprocfs_seq_vars *module_vars,
165 #ifndef HAVE_ONLY_PROCFS_SEQ
166                         struct lprocfs_vars *vars,
167 #endif
168                         const char *name, struct lu_device_type *ldt)
169 {
170         struct obd_type *type;
171         int rc = 0;
172         ENTRY;
173
174         /* sanity check */
175         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
176
177         if (class_search_type(name)) {
178                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
179                 RETURN(-EEXIST);
180         }
181
182         rc = -ENOMEM;
183         OBD_ALLOC(type, sizeof(*type));
184         if (type == NULL)
185                 RETURN(rc);
186
187         OBD_ALLOC_PTR(type->typ_dt_ops);
188         OBD_ALLOC_PTR(type->typ_md_ops);
189         OBD_ALLOC(type->typ_name, strlen(name) + 1);
190
191         if (type->typ_dt_ops == NULL ||
192             type->typ_md_ops == NULL ||
193             type->typ_name == NULL)
194                 GOTO (failed, rc);
195
196         *(type->typ_dt_ops) = *dt_ops;
197         /* md_ops is optional */
198         if (md_ops)
199                 *(type->typ_md_ops) = *md_ops;
200         strcpy(type->typ_name, name);
201         spin_lock_init(&type->obd_type_lock);
202
203 #ifdef LPROCFS
204         if (enable_proc) {
205 #ifndef HAVE_ONLY_PROCFS_SEQ
206                 if (vars) {
207                         type->typ_procroot = lprocfs_register(type->typ_name,
208                                                               proc_lustre_root,
209                                                               vars, type);
210                 } else
211 #endif
212                 {
213                         type->typ_procroot = lprocfs_seq_register(type->typ_name,
214                                                                   proc_lustre_root,
215                                                                   module_vars, type);
216                 }
217                 if (IS_ERR(type->typ_procroot)) {
218                         rc = PTR_ERR(type->typ_procroot);
219                         type->typ_procroot = NULL;
220                         GOTO(failed, rc);
221                 }
222         }
223 #endif
224         if (ldt != NULL) {
225                 type->typ_lu = ldt;
226                 rc = lu_device_type_init(ldt);
227                 if (rc != 0)
228                         GOTO (failed, rc);
229         }
230
231         spin_lock(&obd_types_lock);
232         list_add(&type->typ_chain, &obd_types);
233         spin_unlock(&obd_types_lock);
234
235         RETURN (0);
236
237 failed:
238         if (type->typ_name != NULL) {
239 #ifdef LPROCFS
240                 if (type->typ_procroot != NULL) {
241 #ifndef HAVE_ONLY_PROCFS_SEQ
242                         lprocfs_try_remove_proc_entry(type->typ_name,
243                                                       proc_lustre_root);
244 #else
245                         remove_proc_subtree(type->typ_name, proc_lustre_root);
246 #endif
247                 }
248 #endif
249                 OBD_FREE(type->typ_name, strlen(name) + 1);
250         }
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         if (type->typ_dt_ops != NULL)
254                 OBD_FREE_PTR(type->typ_dt_ops);
255         OBD_FREE(type, sizeof(*type));
256         RETURN(rc);
257 }
258 EXPORT_SYMBOL(class_register_type);
259
260 int class_unregister_type(const char *name)
261 {
262         struct obd_type *type = class_search_type(name);
263         ENTRY;
264
265         if (!type) {
266                 CERROR("unknown obd type\n");
267                 RETURN(-EINVAL);
268         }
269
270         if (type->typ_refcnt) {
271                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
272                 /* This is a bad situation, let's make the best of it */
273                 /* Remove ops, but leave the name for debugging */
274                 OBD_FREE_PTR(type->typ_dt_ops);
275                 OBD_FREE_PTR(type->typ_md_ops);
276                 RETURN(-EBUSY);
277         }
278
279         /* we do not use type->typ_procroot as for compatibility purposes
280          * other modules can share names (i.e. lod can use lov entry). so
281          * we can't reference pointer as it can get invalided when another
282          * module removes the entry */
283 #ifdef LPROCFS
284         if (type->typ_procroot != NULL) {
285 #ifndef HAVE_ONLY_PROCFS_SEQ
286                 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
287 #else
288                 remove_proc_subtree(type->typ_name, proc_lustre_root);
289 #endif
290         }
291
292         if (type->typ_procsym != NULL)
293                 lprocfs_remove(&type->typ_procsym);
294 #endif
295         if (type->typ_lu)
296                 lu_device_type_fini(type->typ_lu);
297
298         spin_lock(&obd_types_lock);
299         list_del(&type->typ_chain);
300         spin_unlock(&obd_types_lock);
301         OBD_FREE(type->typ_name, strlen(name) + 1);
302         if (type->typ_dt_ops != NULL)
303                 OBD_FREE_PTR(type->typ_dt_ops);
304         if (type->typ_md_ops != NULL)
305                 OBD_FREE_PTR(type->typ_md_ops);
306         OBD_FREE(type, sizeof(*type));
307         RETURN(0);
308 } /* class_unregister_type */
309 EXPORT_SYMBOL(class_unregister_type);
310
311 /**
312  * Create a new obd device.
313  *
314  * Find an empty slot in ::obd_devs[], create a new obd device in it.
315  *
316  * \param[in] type_name obd device type string.
317  * \param[in] name      obd device name.
318  *
319  * \retval NULL if create fails, otherwise return the obd device
320  *         pointer created.
321  */
322 struct obd_device *class_newdev(const char *type_name, const char *name)
323 {
324         struct obd_device *result = NULL;
325         struct obd_device *newdev;
326         struct obd_type *type = NULL;
327         int i;
328         int new_obd_minor = 0;
329         ENTRY;
330
331         if (strlen(name) >= MAX_OBD_NAME) {
332                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
333                 RETURN(ERR_PTR(-EINVAL));
334         }
335
336         type = class_get_type(type_name);
337         if (type == NULL){
338                 CERROR("OBD: unknown type: %s\n", type_name);
339                 RETURN(ERR_PTR(-ENODEV));
340         }
341
342         newdev = obd_device_alloc();
343         if (newdev == NULL)
344                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
345
346         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
347
348         write_lock(&obd_dev_lock);
349         for (i = 0; i < class_devno_max(); i++) {
350                 struct obd_device *obd = class_num2obd(i);
351
352                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
353                         CERROR("Device %s already exists at %d, won't add\n",
354                                name, i);
355                         if (result) {
356                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
357                                          "%p obd_magic %08x != %08x\n", result,
358                                          result->obd_magic, OBD_DEVICE_MAGIC);
359                                 LASSERTF(result->obd_minor == new_obd_minor,
360                                          "%p obd_minor %d != %d\n", result,
361                                          result->obd_minor, new_obd_minor);
362
363                                 obd_devs[result->obd_minor] = NULL;
364                                 result->obd_name[0]='\0';
365                          }
366                         result = ERR_PTR(-EEXIST);
367                         break;
368                 }
369                 if (!result && !obd) {
370                         result = newdev;
371                         result->obd_minor = i;
372                         new_obd_minor = i;
373                         result->obd_type = type;
374                         strncpy(result->obd_name, name,
375                                 sizeof(result->obd_name) - 1);
376                         obd_devs[i] = result;
377                 }
378         }
379         write_unlock(&obd_dev_lock);
380
381         if (result == NULL && i >= class_devno_max()) {
382                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
383                        class_devno_max());
384                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
385         }
386
387         if (IS_ERR(result))
388                 GOTO(out, result);
389
390         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
391                result->obd_name, result);
392
393         RETURN(result);
394 out:
395         obd_device_free(newdev);
396 out_type:
397         class_put_type(type);
398         return result;
399 }
400
401 void class_release_dev(struct obd_device *obd)
402 {
403         struct obd_type *obd_type = obd->obd_type;
404
405         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
406                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
407         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
408                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
409         LASSERT(obd_type != NULL);
410
411         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
412                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
413
414         write_lock(&obd_dev_lock);
415         obd_devs[obd->obd_minor] = NULL;
416         write_unlock(&obd_dev_lock);
417         obd_device_free(obd);
418
419         class_put_type(obd_type);
420 }
421
422 int class_name2dev(const char *name)
423 {
424         int i;
425
426         if (!name)
427                 return -1;
428
429         read_lock(&obd_dev_lock);
430         for (i = 0; i < class_devno_max(); i++) {
431                 struct obd_device *obd = class_num2obd(i);
432
433                 if (obd && strcmp(name, obd->obd_name) == 0) {
434                         /* Make sure we finished attaching before we give
435                            out any references */
436                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
437                         if (obd->obd_attached) {
438                                 read_unlock(&obd_dev_lock);
439                                 return i;
440                         }
441                         break;
442                 }
443         }
444         read_unlock(&obd_dev_lock);
445
446         return -1;
447 }
448 EXPORT_SYMBOL(class_name2dev);
449
450 struct obd_device *class_name2obd(const char *name)
451 {
452         int dev = class_name2dev(name);
453
454         if (dev < 0 || dev > class_devno_max())
455                 return NULL;
456         return class_num2obd(dev);
457 }
458 EXPORT_SYMBOL(class_name2obd);
459
460 int class_uuid2dev(struct obd_uuid *uuid)
461 {
462         int i;
463
464         read_lock(&obd_dev_lock);
465         for (i = 0; i < class_devno_max(); i++) {
466                 struct obd_device *obd = class_num2obd(i);
467
468                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
469                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
470                         read_unlock(&obd_dev_lock);
471                         return i;
472                 }
473         }
474         read_unlock(&obd_dev_lock);
475
476         return -1;
477 }
478 EXPORT_SYMBOL(class_uuid2dev);
479
480 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
481 {
482         int dev = class_uuid2dev(uuid);
483         if (dev < 0)
484                 return NULL;
485         return class_num2obd(dev);
486 }
487 EXPORT_SYMBOL(class_uuid2obd);
488
489 /**
490  * Get obd device from ::obd_devs[]
491  *
492  * \param num [in] array index
493  *
494  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
495  *         otherwise return the obd device there.
496  */
497 struct obd_device *class_num2obd(int num)
498 {
499         struct obd_device *obd = NULL;
500
501         if (num < class_devno_max()) {
502                 obd = obd_devs[num];
503                 if (obd == NULL)
504                         return NULL;
505
506                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
507                          "%p obd_magic %08x != %08x\n",
508                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
509                 LASSERTF(obd->obd_minor == num,
510                          "%p obd_minor %0d != %0d\n",
511                          obd, obd->obd_minor, num);
512         }
513
514         return obd;
515 }
516 EXPORT_SYMBOL(class_num2obd);
517
518 /**
519  * Get obd devices count. Device in any
520  *    state are counted
521  * \retval obd device count
522  */
523 int get_devices_count(void)
524 {
525         int index, max_index = class_devno_max(), dev_count = 0;
526
527         read_lock(&obd_dev_lock);
528         for (index = 0; index <= max_index; index++) {
529                 struct obd_device *obd = class_num2obd(index);
530                 if (obd != NULL)
531                         dev_count++;
532         }
533         read_unlock(&obd_dev_lock);
534
535         return dev_count;
536 }
537 EXPORT_SYMBOL(get_devices_count);
538
539 void class_obd_list(void)
540 {
541         char *status;
542         int i;
543
544         read_lock(&obd_dev_lock);
545         for (i = 0; i < class_devno_max(); i++) {
546                 struct obd_device *obd = class_num2obd(i);
547
548                 if (obd == NULL)
549                         continue;
550                 if (obd->obd_stopping)
551                         status = "ST";
552                 else if (obd->obd_set_up)
553                         status = "UP";
554                 else if (obd->obd_attached)
555                         status = "AT";
556                 else
557                         status = "--";
558                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
559                          i, status, obd->obd_type->typ_name,
560                          obd->obd_name, obd->obd_uuid.uuid,
561                          atomic_read(&obd->obd_refcount));
562         }
563         read_unlock(&obd_dev_lock);
564         return;
565 }
566
567 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
568    specified, then only the client with that uuid is returned,
569    otherwise any client connected to the tgt is returned. */
570 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
571                                           const char * typ_name,
572                                           struct obd_uuid *grp_uuid)
573 {
574         int i;
575
576         read_lock(&obd_dev_lock);
577         for (i = 0; i < class_devno_max(); i++) {
578                 struct obd_device *obd = class_num2obd(i);
579
580                 if (obd == NULL)
581                         continue;
582                 if ((strncmp(obd->obd_type->typ_name, typ_name,
583                              strlen(typ_name)) == 0)) {
584                         if (obd_uuid_equals(tgt_uuid,
585                                             &obd->u.cli.cl_target_uuid) &&
586                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
587                                                          &obd->obd_uuid) : 1)) {
588                                 read_unlock(&obd_dev_lock);
589                                 return obd;
590                         }
591                 }
592         }
593         read_unlock(&obd_dev_lock);
594
595         return NULL;
596 }
597 EXPORT_SYMBOL(class_find_client_obd);
598
599 /* Iterate the obd_device list looking devices have grp_uuid. Start
600    searching at *next, and if a device is found, the next index to look
601    at is saved in *next. If next is NULL, then the first matching device
602    will always be returned. */
603 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
604 {
605         int i;
606
607         if (next == NULL)
608                 i = 0;
609         else if (*next >= 0 && *next < class_devno_max())
610                 i = *next;
611         else
612                 return NULL;
613
614         read_lock(&obd_dev_lock);
615         for (; i < class_devno_max(); i++) {
616                 struct obd_device *obd = class_num2obd(i);
617
618                 if (obd == NULL)
619                         continue;
620                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
621                         if (next != NULL)
622                                 *next = i+1;
623                         read_unlock(&obd_dev_lock);
624                         return obd;
625                 }
626         }
627         read_unlock(&obd_dev_lock);
628
629         return NULL;
630 }
631 EXPORT_SYMBOL(class_devices_in_group);
632
633 /**
634  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
635  * adjust sptlrpc settings accordingly.
636  */
637 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
638 {
639         struct obd_device  *obd;
640         const char         *type;
641         int                 i, rc = 0, rc2;
642
643         LASSERT(namelen > 0);
644
645         read_lock(&obd_dev_lock);
646         for (i = 0; i < class_devno_max(); i++) {
647                 obd = class_num2obd(i);
648
649                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
650                         continue;
651
652                 /* only notify mdc, osc, mdt, ost */
653                 type = obd->obd_type->typ_name;
654                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
655                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
656                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
657                     strcmp(type, LUSTRE_OST_NAME) != 0)
658                         continue;
659
660                 if (strncmp(obd->obd_name, fsname, namelen))
661                         continue;
662
663                 class_incref(obd, __FUNCTION__, obd);
664                 read_unlock(&obd_dev_lock);
665                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
666                                          sizeof(KEY_SPTLRPC_CONF),
667                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
668                 rc = rc ? rc : rc2;
669                 class_decref(obd, __FUNCTION__, obd);
670                 read_lock(&obd_dev_lock);
671         }
672         read_unlock(&obd_dev_lock);
673         return rc;
674 }
675 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
676
677 void obd_cleanup_caches(void)
678 {
679         ENTRY;
680         if (obd_device_cachep) {
681                 kmem_cache_destroy(obd_device_cachep);
682                 obd_device_cachep = NULL;
683         }
684         if (obdo_cachep) {
685                 kmem_cache_destroy(obdo_cachep);
686                 obdo_cachep = NULL;
687         }
688         if (import_cachep) {
689                 kmem_cache_destroy(import_cachep);
690                 import_cachep = NULL;
691         }
692         if (capa_cachep) {
693                 kmem_cache_destroy(capa_cachep);
694                 capa_cachep = NULL;
695         }
696         EXIT;
697 }
698
699 int obd_init_caches(void)
700 {
701         int rc;
702         ENTRY;
703
704         LASSERT(obd_device_cachep == NULL);
705         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
706                                               sizeof(struct obd_device),
707                                               0, 0, NULL);
708         if (!obd_device_cachep)
709                 GOTO(out, rc = -ENOMEM);
710
711         LASSERT(obdo_cachep == NULL);
712         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
713                                         0, 0, NULL);
714         if (!obdo_cachep)
715                 GOTO(out, rc = -ENOMEM);
716
717         LASSERT(import_cachep == NULL);
718         import_cachep = kmem_cache_create("ll_import_cache",
719                                           sizeof(struct obd_import),
720                                           0, 0, NULL);
721         if (!import_cachep)
722                 GOTO(out, rc = -ENOMEM);
723
724         LASSERT(capa_cachep == NULL);
725         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
726                                         0, 0, NULL);
727         if (!capa_cachep)
728                 GOTO(out, rc = -ENOMEM);
729
730         RETURN(0);
731 out:
732         obd_cleanup_caches();
733         RETURN(rc);
734 }
735
736 /* map connection to client */
737 struct obd_export *class_conn2export(struct lustre_handle *conn)
738 {
739         struct obd_export *export;
740         ENTRY;
741
742         if (!conn) {
743                 CDEBUG(D_CACHE, "looking for null handle\n");
744                 RETURN(NULL);
745         }
746
747         if (conn->cookie == -1) {  /* this means assign a new connection */
748                 CDEBUG(D_CACHE, "want a new connection\n");
749                 RETURN(NULL);
750         }
751
752         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
753         export = class_handle2object(conn->cookie, NULL);
754         RETURN(export);
755 }
756 EXPORT_SYMBOL(class_conn2export);
757
758 struct obd_device *class_exp2obd(struct obd_export *exp)
759 {
760         if (exp)
761                 return exp->exp_obd;
762         return NULL;
763 }
764 EXPORT_SYMBOL(class_exp2obd);
765
766 struct obd_device *class_conn2obd(struct lustre_handle *conn)
767 {
768         struct obd_export *export;
769         export = class_conn2export(conn);
770         if (export) {
771                 struct obd_device *obd = export->exp_obd;
772                 class_export_put(export);
773                 return obd;
774         }
775         return NULL;
776 }
777 EXPORT_SYMBOL(class_conn2obd);
778
779 struct obd_import *class_exp2cliimp(struct obd_export *exp)
780 {
781         struct obd_device *obd = exp->exp_obd;
782         if (obd == NULL)
783                 return NULL;
784         return obd->u.cli.cl_import;
785 }
786 EXPORT_SYMBOL(class_exp2cliimp);
787
788 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
789 {
790         struct obd_device *obd = class_conn2obd(conn);
791         if (obd == NULL)
792                 return NULL;
793         return obd->u.cli.cl_import;
794 }
795 EXPORT_SYMBOL(class_conn2cliimp);
796
797 /* Export management functions */
798 static void class_export_destroy(struct obd_export *exp)
799 {
800         struct obd_device *obd = exp->exp_obd;
801         ENTRY;
802
803         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
804         LASSERT(obd != NULL);
805
806         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
807                exp->exp_client_uuid.uuid, obd->obd_name);
808
809         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
810         if (exp->exp_connection)
811                 ptlrpc_put_connection_superhack(exp->exp_connection);
812
813         LASSERT(list_empty(&exp->exp_outstanding_replies));
814         LASSERT(list_empty(&exp->exp_uncommitted_replies));
815         LASSERT(list_empty(&exp->exp_req_replay_queue));
816         LASSERT(list_empty(&exp->exp_hp_rpcs));
817         obd_destroy_export(exp);
818         class_decref(obd, "export", exp);
819
820         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
821         EXIT;
822 }
823
824 static void export_handle_addref(void *export)
825 {
826         class_export_get(export);
827 }
828
829 static struct portals_handle_ops export_handle_ops = {
830         .hop_addref = export_handle_addref,
831         .hop_free   = NULL,
832 };
833
834 struct obd_export *class_export_get(struct obd_export *exp)
835 {
836         atomic_inc(&exp->exp_refcount);
837         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
838                atomic_read(&exp->exp_refcount));
839         return exp;
840 }
841 EXPORT_SYMBOL(class_export_get);
842
843 void class_export_put(struct obd_export *exp)
844 {
845         LASSERT(exp != NULL);
846         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
847         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
848                atomic_read(&exp->exp_refcount) - 1);
849
850         if (atomic_dec_and_test(&exp->exp_refcount)) {
851                 LASSERT(!list_empty(&exp->exp_obd_chain));
852                 CDEBUG(D_IOCTL, "final put %p/%s\n",
853                        exp, exp->exp_client_uuid.uuid);
854
855                 /* release nid stat refererence */
856                 lprocfs_exp_cleanup(exp);
857
858                 obd_zombie_export_add(exp);
859         }
860 }
861 EXPORT_SYMBOL(class_export_put);
862
863 /* Creates a new export, adds it to the hash table, and returns a
864  * pointer to it. The refcount is 2: one for the hash reference, and
865  * one for the pointer returned by this function. */
866 struct obd_export *class_new_export(struct obd_device *obd,
867                                     struct obd_uuid *cluuid)
868 {
869         struct obd_export *export;
870         cfs_hash_t *hash = NULL;
871         int rc = 0;
872         ENTRY;
873
874         OBD_ALLOC_PTR(export);
875         if (!export)
876                 return ERR_PTR(-ENOMEM);
877
878         export->exp_conn_cnt = 0;
879         export->exp_lock_hash = NULL;
880         export->exp_flock_hash = NULL;
881         atomic_set(&export->exp_refcount, 2);
882         atomic_set(&export->exp_rpc_count, 0);
883         atomic_set(&export->exp_cb_count, 0);
884         atomic_set(&export->exp_locks_count, 0);
885 #if LUSTRE_TRACKS_LOCK_EXP_REFS
886         INIT_LIST_HEAD(&export->exp_locks_list);
887         spin_lock_init(&export->exp_locks_list_guard);
888 #endif
889         atomic_set(&export->exp_replay_count, 0);
890         export->exp_obd = obd;
891         INIT_LIST_HEAD(&export->exp_outstanding_replies);
892         spin_lock_init(&export->exp_uncommitted_replies_lock);
893         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
894         INIT_LIST_HEAD(&export->exp_req_replay_queue);
895         INIT_LIST_HEAD(&export->exp_handle.h_link);
896         INIT_LIST_HEAD(&export->exp_hp_rpcs);
897         INIT_LIST_HEAD(&export->exp_reg_rpcs);
898         class_handle_hash(&export->exp_handle, &export_handle_ops);
899         export->exp_last_request_time = cfs_time_current_sec();
900         spin_lock_init(&export->exp_lock);
901         spin_lock_init(&export->exp_rpc_lock);
902         INIT_HLIST_NODE(&export->exp_uuid_hash);
903         INIT_HLIST_NODE(&export->exp_nid_hash);
904         spin_lock_init(&export->exp_bl_list_lock);
905         INIT_LIST_HEAD(&export->exp_bl_list);
906
907         export->exp_sp_peer = LUSTRE_SP_ANY;
908         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
909         export->exp_client_uuid = *cluuid;
910         obd_init_export(export);
911
912         spin_lock(&obd->obd_dev_lock);
913         /* shouldn't happen, but might race */
914         if (obd->obd_stopping)
915                 GOTO(exit_unlock, rc = -ENODEV);
916
917         hash = cfs_hash_getref(obd->obd_uuid_hash);
918         if (hash == NULL)
919                 GOTO(exit_unlock, rc = -ENODEV);
920         spin_unlock(&obd->obd_dev_lock);
921
922         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
923                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
924                 if (rc != 0) {
925                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
926                                       obd->obd_name, cluuid->uuid, rc);
927                         GOTO(exit_err, rc = -EALREADY);
928                 }
929         }
930
931         spin_lock(&obd->obd_dev_lock);
932         if (obd->obd_stopping) {
933                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
934                 GOTO(exit_unlock, rc = -ENODEV);
935         }
936
937         class_incref(obd, "export", export);
938         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
939         list_add_tail(&export->exp_obd_chain_timed,
940                       &export->exp_obd->obd_exports_timed);
941         export->exp_obd->obd_num_exports++;
942         spin_unlock(&obd->obd_dev_lock);
943         cfs_hash_putref(hash);
944         RETURN(export);
945
946 exit_unlock:
947         spin_unlock(&obd->obd_dev_lock);
948 exit_err:
949         if (hash)
950                 cfs_hash_putref(hash);
951         class_handle_unhash(&export->exp_handle);
952         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
953         obd_destroy_export(export);
954         OBD_FREE_PTR(export);
955         return ERR_PTR(rc);
956 }
957 EXPORT_SYMBOL(class_new_export);
958
959 void class_unlink_export(struct obd_export *exp)
960 {
961         class_handle_unhash(&exp->exp_handle);
962
963         spin_lock(&exp->exp_obd->obd_dev_lock);
964         /* delete an uuid-export hashitem from hashtables */
965         if (!hlist_unhashed(&exp->exp_uuid_hash))
966                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
967                              &exp->exp_client_uuid,
968                              &exp->exp_uuid_hash);
969
970         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
971         list_del_init(&exp->exp_obd_chain_timed);
972         exp->exp_obd->obd_num_exports--;
973         spin_unlock(&exp->exp_obd->obd_dev_lock);
974         class_export_put(exp);
975 }
976 EXPORT_SYMBOL(class_unlink_export);
977
978 /* Import management functions */
979 void class_import_destroy(struct obd_import *imp)
980 {
981         ENTRY;
982
983         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
984                 imp->imp_obd->obd_name);
985
986         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
987
988         ptlrpc_put_connection_superhack(imp->imp_connection);
989
990         while (!list_empty(&imp->imp_conn_list)) {
991                 struct obd_import_conn *imp_conn;
992
993                 imp_conn = list_entry(imp->imp_conn_list.next,
994                                       struct obd_import_conn, oic_item);
995                 list_del_init(&imp_conn->oic_item);
996                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
997                 OBD_FREE(imp_conn, sizeof(*imp_conn));
998         }
999
1000         LASSERT(imp->imp_sec == NULL);
1001         class_decref(imp->imp_obd, "import", imp);
1002         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1003         EXIT;
1004 }
1005
1006 static void import_handle_addref(void *import)
1007 {
1008         class_import_get(import);
1009 }
1010
1011 static struct portals_handle_ops import_handle_ops = {
1012         .hop_addref = import_handle_addref,
1013         .hop_free   = NULL,
1014 };
1015
1016 struct obd_import *class_import_get(struct obd_import *import)
1017 {
1018         atomic_inc(&import->imp_refcount);
1019         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1020                atomic_read(&import->imp_refcount),
1021                import->imp_obd->obd_name);
1022         return import;
1023 }
1024 EXPORT_SYMBOL(class_import_get);
1025
1026 void class_import_put(struct obd_import *imp)
1027 {
1028         ENTRY;
1029
1030         LASSERT(list_empty(&imp->imp_zombie_chain));
1031         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1032
1033         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1034                atomic_read(&imp->imp_refcount) - 1,
1035                imp->imp_obd->obd_name);
1036
1037         if (atomic_dec_and_test(&imp->imp_refcount)) {
1038                 CDEBUG(D_INFO, "final put import %p\n", imp);
1039                 obd_zombie_import_add(imp);
1040         }
1041
1042         /* catch possible import put race */
1043         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1044         EXIT;
1045 }
1046 EXPORT_SYMBOL(class_import_put);
1047
1048 static void init_imp_at(struct imp_at *at) {
1049         int i;
1050         at_init(&at->iat_net_latency, 0, 0);
1051         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1052                 /* max service estimates are tracked on the server side, so
1053                    don't use the AT history here, just use the last reported
1054                    val. (But keep hist for proc histogram, worst_ever) */
1055                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1056                         AT_FLG_NOHIST);
1057         }
1058 }
1059
1060 struct obd_import *class_new_import(struct obd_device *obd)
1061 {
1062         struct obd_import *imp;
1063
1064         OBD_ALLOC(imp, sizeof(*imp));
1065         if (imp == NULL)
1066                 return NULL;
1067
1068         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1069         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1070         INIT_LIST_HEAD(&imp->imp_replay_list);
1071         INIT_LIST_HEAD(&imp->imp_sending_list);
1072         INIT_LIST_HEAD(&imp->imp_delayed_list);
1073         INIT_LIST_HEAD(&imp->imp_committed_list);
1074         imp->imp_replay_cursor = &imp->imp_committed_list;
1075         spin_lock_init(&imp->imp_lock);
1076         imp->imp_last_success_conn = 0;
1077         imp->imp_state = LUSTRE_IMP_NEW;
1078         imp->imp_obd = class_incref(obd, "import", imp);
1079         mutex_init(&imp->imp_sec_mutex);
1080         init_waitqueue_head(&imp->imp_recovery_waitq);
1081
1082         atomic_set(&imp->imp_refcount, 2);
1083         atomic_set(&imp->imp_unregistering, 0);
1084         atomic_set(&imp->imp_inflight, 0);
1085         atomic_set(&imp->imp_replay_inflight, 0);
1086         atomic_set(&imp->imp_inval_count, 0);
1087         INIT_LIST_HEAD(&imp->imp_conn_list);
1088         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1089         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1090         init_imp_at(&imp->imp_at);
1091
1092         /* the default magic is V2, will be used in connect RPC, and
1093          * then adjusted according to the flags in request/reply. */
1094         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1095
1096         return imp;
1097 }
1098 EXPORT_SYMBOL(class_new_import);
1099
1100 void class_destroy_import(struct obd_import *import)
1101 {
1102         LASSERT(import != NULL);
1103         LASSERT(import != LP_POISON);
1104
1105         class_handle_unhash(&import->imp_handle);
1106
1107         spin_lock(&import->imp_lock);
1108         import->imp_generation++;
1109         spin_unlock(&import->imp_lock);
1110         class_import_put(import);
1111 }
1112 EXPORT_SYMBOL(class_destroy_import);
1113
1114 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1115
1116 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1117 {
1118         spin_lock(&exp->exp_locks_list_guard);
1119
1120         LASSERT(lock->l_exp_refs_nr >= 0);
1121
1122         if (lock->l_exp_refs_target != NULL &&
1123             lock->l_exp_refs_target != exp) {
1124                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1125                               exp, lock, lock->l_exp_refs_target);
1126         }
1127         if ((lock->l_exp_refs_nr ++) == 0) {
1128                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1129                 lock->l_exp_refs_target = exp;
1130         }
1131         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1132                lock, exp, lock->l_exp_refs_nr);
1133         spin_unlock(&exp->exp_locks_list_guard);
1134 }
1135 EXPORT_SYMBOL(__class_export_add_lock_ref);
1136
1137 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1138 {
1139         spin_lock(&exp->exp_locks_list_guard);
1140         LASSERT(lock->l_exp_refs_nr > 0);
1141         if (lock->l_exp_refs_target != exp) {
1142                 LCONSOLE_WARN("lock %p, "
1143                               "mismatching export pointers: %p, %p\n",
1144                               lock, lock->l_exp_refs_target, exp);
1145         }
1146         if (-- lock->l_exp_refs_nr == 0) {
1147                 list_del_init(&lock->l_exp_refs_link);
1148                 lock->l_exp_refs_target = NULL;
1149         }
1150         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1151                lock, exp, lock->l_exp_refs_nr);
1152         spin_unlock(&exp->exp_locks_list_guard);
1153 }
1154 EXPORT_SYMBOL(__class_export_del_lock_ref);
1155 #endif
1156
1157 /* A connection defines an export context in which preallocation can
1158    be managed. This releases the export pointer reference, and returns
1159    the export handle, so the export refcount is 1 when this function
1160    returns. */
1161 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1162                   struct obd_uuid *cluuid)
1163 {
1164         struct obd_export *export;
1165         LASSERT(conn != NULL);
1166         LASSERT(obd != NULL);
1167         LASSERT(cluuid != NULL);
1168         ENTRY;
1169
1170         export = class_new_export(obd, cluuid);
1171         if (IS_ERR(export))
1172                 RETURN(PTR_ERR(export));
1173
1174         conn->cookie = export->exp_handle.h_cookie;
1175         class_export_put(export);
1176
1177         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1178                cluuid->uuid, conn->cookie);
1179         RETURN(0);
1180 }
1181 EXPORT_SYMBOL(class_connect);
1182
1183 /* if export is involved in recovery then clean up related things */
1184 void class_export_recovery_cleanup(struct obd_export *exp)
1185 {
1186         struct obd_device *obd = exp->exp_obd;
1187
1188         spin_lock(&obd->obd_recovery_task_lock);
1189         if (obd->obd_recovering) {
1190                 if (exp->exp_in_recovery) {
1191                         spin_lock(&exp->exp_lock);
1192                         exp->exp_in_recovery = 0;
1193                         spin_unlock(&exp->exp_lock);
1194                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1195                         atomic_dec(&obd->obd_connected_clients);
1196                 }
1197
1198                 /* if called during recovery then should update
1199                  * obd_stale_clients counter,
1200                  * lightweight exports are not counted */
1201                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1202                         exp->exp_obd->obd_stale_clients++;
1203         }
1204         spin_unlock(&obd->obd_recovery_task_lock);
1205
1206         spin_lock(&exp->exp_lock);
1207         /** Cleanup req replay fields */
1208         if (exp->exp_req_replay_needed) {
1209                 exp->exp_req_replay_needed = 0;
1210
1211                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1212                 atomic_dec(&obd->obd_req_replay_clients);
1213         }
1214
1215         /** Cleanup lock replay data */
1216         if (exp->exp_lock_replay_needed) {
1217                 exp->exp_lock_replay_needed = 0;
1218
1219                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1220                 atomic_dec(&obd->obd_lock_replay_clients);
1221         }
1222         spin_unlock(&exp->exp_lock);
1223 }
1224
1225 /* This function removes 1-3 references from the export:
1226  * 1 - for export pointer passed
1227  * and if disconnect really need
1228  * 2 - removing from hash
1229  * 3 - in client_unlink_export
1230  * The export pointer passed to this function can destroyed */
1231 int class_disconnect(struct obd_export *export)
1232 {
1233         int already_disconnected;
1234         ENTRY;
1235
1236         if (export == NULL) {
1237                 CWARN("attempting to free NULL export %p\n", export);
1238                 RETURN(-EINVAL);
1239         }
1240
1241         spin_lock(&export->exp_lock);
1242         already_disconnected = export->exp_disconnected;
1243         export->exp_disconnected = 1;
1244         spin_unlock(&export->exp_lock);
1245
1246         /* class_cleanup(), abort_recovery(), and class_fail_export()
1247          * all end up in here, and if any of them race we shouldn't
1248          * call extra class_export_puts(). */
1249         if (already_disconnected) {
1250                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1251                 GOTO(no_disconn, already_disconnected);
1252         }
1253
1254         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1255                export->exp_handle.h_cookie);
1256
1257         if (!hlist_unhashed(&export->exp_nid_hash))
1258                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1259                              &export->exp_connection->c_peer.nid,
1260                              &export->exp_nid_hash);
1261
1262         class_export_recovery_cleanup(export);
1263         class_unlink_export(export);
1264 no_disconn:
1265         class_export_put(export);
1266         RETURN(0);
1267 }
1268 EXPORT_SYMBOL(class_disconnect);
1269
1270 /* Return non-zero for a fully connected export */
1271 int class_connected_export(struct obd_export *exp)
1272 {
1273         int connected = 0;
1274
1275         if (exp) {
1276                 spin_lock(&exp->exp_lock);
1277                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1278                 spin_unlock(&exp->exp_lock);
1279         }
1280         return connected;
1281 }
1282 EXPORT_SYMBOL(class_connected_export);
1283
1284 static void class_disconnect_export_list(struct list_head *list,
1285                                          enum obd_option flags)
1286 {
1287         int rc;
1288         struct obd_export *exp;
1289         ENTRY;
1290
1291         /* It's possible that an export may disconnect itself, but
1292          * nothing else will be added to this list. */
1293         while (!list_empty(list)) {
1294                 exp = list_entry(list->next, struct obd_export,
1295                                  exp_obd_chain);
1296                 /* need for safe call CDEBUG after obd_disconnect */
1297                 class_export_get(exp);
1298
1299                 spin_lock(&exp->exp_lock);
1300                 exp->exp_flags = flags;
1301                 spin_unlock(&exp->exp_lock);
1302
1303                 if (obd_uuid_equals(&exp->exp_client_uuid,
1304                                     &exp->exp_obd->obd_uuid)) {
1305                         CDEBUG(D_HA,
1306                                "exp %p export uuid == obd uuid, don't discon\n",
1307                                exp);
1308                         /* Need to delete this now so we don't end up pointing
1309                          * to work_list later when this export is cleaned up. */
1310                         list_del_init(&exp->exp_obd_chain);
1311                         class_export_put(exp);
1312                         continue;
1313                 }
1314
1315                 class_export_get(exp);
1316                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1317                        "last request at "CFS_TIME_T"\n",
1318                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1319                        exp, exp->exp_last_request_time);
1320                 /* release one export reference anyway */
1321                 rc = obd_disconnect(exp);
1322
1323                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1324                        obd_export_nid2str(exp), exp, rc);
1325                 class_export_put(exp);
1326         }
1327         EXIT;
1328 }
1329
1330 void class_disconnect_exports(struct obd_device *obd)
1331 {
1332         struct list_head work_list;
1333         ENTRY;
1334
1335         /* Move all of the exports from obd_exports to a work list, en masse. */
1336         INIT_LIST_HEAD(&work_list);
1337         spin_lock(&obd->obd_dev_lock);
1338         list_splice_init(&obd->obd_exports, &work_list);
1339         list_splice_init(&obd->obd_delayed_exports, &work_list);
1340         spin_unlock(&obd->obd_dev_lock);
1341
1342         if (!list_empty(&work_list)) {
1343                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1344                        "disconnecting them\n", obd->obd_minor, obd);
1345                 class_disconnect_export_list(&work_list,
1346                                              exp_flags_from_obd(obd));
1347         } else
1348                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1349                        obd->obd_minor, obd);
1350         EXIT;
1351 }
1352 EXPORT_SYMBOL(class_disconnect_exports);
1353
1354 /* Remove exports that have not completed recovery.
1355  */
1356 void class_disconnect_stale_exports(struct obd_device *obd,
1357                                     int (*test_export)(struct obd_export *))
1358 {
1359         struct list_head work_list;
1360         struct obd_export *exp, *n;
1361         int evicted = 0;
1362         ENTRY;
1363
1364         INIT_LIST_HEAD(&work_list);
1365         spin_lock(&obd->obd_dev_lock);
1366         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1367                                  exp_obd_chain) {
1368                 /* don't count self-export as client */
1369                 if (obd_uuid_equals(&exp->exp_client_uuid,
1370                                     &exp->exp_obd->obd_uuid))
1371                         continue;
1372
1373                 /* don't evict clients which have no slot in last_rcvd
1374                  * (e.g. lightweight connection) */
1375                 if (exp->exp_target_data.ted_lr_idx == -1)
1376                         continue;
1377
1378                 spin_lock(&exp->exp_lock);
1379                 if (exp->exp_failed || test_export(exp)) {
1380                         spin_unlock(&exp->exp_lock);
1381                         continue;
1382                 }
1383                 exp->exp_failed = 1;
1384                 spin_unlock(&exp->exp_lock);
1385
1386                 list_move(&exp->exp_obd_chain, &work_list);
1387                 evicted++;
1388                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1389                        obd->obd_name, exp->exp_client_uuid.uuid,
1390                        exp->exp_connection == NULL ? "<unknown>" :
1391                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1392                 print_export_data(exp, "EVICTING", 0);
1393         }
1394         spin_unlock(&obd->obd_dev_lock);
1395
1396         if (evicted)
1397                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1398                               obd->obd_name, evicted);
1399
1400         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1401                                                  OBD_OPT_ABORT_RECOV);
1402         EXIT;
1403 }
1404 EXPORT_SYMBOL(class_disconnect_stale_exports);
1405
1406 void class_fail_export(struct obd_export *exp)
1407 {
1408         int rc, already_failed;
1409
1410         spin_lock(&exp->exp_lock);
1411         already_failed = exp->exp_failed;
1412         exp->exp_failed = 1;
1413         spin_unlock(&exp->exp_lock);
1414
1415         if (already_failed) {
1416                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1417                        exp, exp->exp_client_uuid.uuid);
1418                 return;
1419         }
1420
1421         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1422                exp, exp->exp_client_uuid.uuid);
1423
1424         if (obd_dump_on_timeout)
1425                 libcfs_debug_dumplog();
1426
1427         /* need for safe call CDEBUG after obd_disconnect */
1428         class_export_get(exp);
1429
1430         /* Most callers into obd_disconnect are removing their own reference
1431          * (request, for example) in addition to the one from the hash table.
1432          * We don't have such a reference here, so make one. */
1433         class_export_get(exp);
1434         rc = obd_disconnect(exp);
1435         if (rc)
1436                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1437         else
1438                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1439                        exp, exp->exp_client_uuid.uuid);
1440         class_export_put(exp);
1441 }
1442 EXPORT_SYMBOL(class_fail_export);
1443
1444 char *obd_export_nid2str(struct obd_export *exp)
1445 {
1446         if (exp->exp_connection != NULL)
1447                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1448
1449         return "(no nid)";
1450 }
1451 EXPORT_SYMBOL(obd_export_nid2str);
1452
1453 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1454 {
1455         cfs_hash_t *nid_hash;
1456         struct obd_export *doomed_exp = NULL;
1457         int exports_evicted = 0;
1458
1459         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1460
1461         spin_lock(&obd->obd_dev_lock);
1462         /* umount has run already, so evict thread should leave
1463          * its task to umount thread now */
1464         if (obd->obd_stopping) {
1465                 spin_unlock(&obd->obd_dev_lock);
1466                 return exports_evicted;
1467         }
1468         nid_hash = obd->obd_nid_hash;
1469         cfs_hash_getref(nid_hash);
1470         spin_unlock(&obd->obd_dev_lock);
1471
1472         do {
1473                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1474                 if (doomed_exp == NULL)
1475                         break;
1476
1477                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1478                          "nid %s found, wanted nid %s, requested nid %s\n",
1479                          obd_export_nid2str(doomed_exp),
1480                          libcfs_nid2str(nid_key), nid);
1481                 LASSERTF(doomed_exp != obd->obd_self_export,
1482                          "self-export is hashed by NID?\n");
1483                 exports_evicted++;
1484                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1485                               "request\n", obd->obd_name,
1486                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1487                               obd_export_nid2str(doomed_exp));
1488                 class_fail_export(doomed_exp);
1489                 class_export_put(doomed_exp);
1490         } while (1);
1491
1492         cfs_hash_putref(nid_hash);
1493
1494         if (!exports_evicted)
1495                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1496                        obd->obd_name, nid);
1497         return exports_evicted;
1498 }
1499 EXPORT_SYMBOL(obd_export_evict_by_nid);
1500
1501 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1502 {
1503         cfs_hash_t *uuid_hash;
1504         struct obd_export *doomed_exp = NULL;
1505         struct obd_uuid doomed_uuid;
1506         int exports_evicted = 0;
1507
1508         spin_lock(&obd->obd_dev_lock);
1509         if (obd->obd_stopping) {
1510                 spin_unlock(&obd->obd_dev_lock);
1511                 return exports_evicted;
1512         }
1513         uuid_hash = obd->obd_uuid_hash;
1514         cfs_hash_getref(uuid_hash);
1515         spin_unlock(&obd->obd_dev_lock);
1516
1517         obd_str2uuid(&doomed_uuid, uuid);
1518         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1519                 CERROR("%s: can't evict myself\n", obd->obd_name);
1520                 cfs_hash_putref(uuid_hash);
1521                 return exports_evicted;
1522         }
1523
1524         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1525
1526         if (doomed_exp == NULL) {
1527                 CERROR("%s: can't disconnect %s: no exports found\n",
1528                        obd->obd_name, uuid);
1529         } else {
1530                 CWARN("%s: evicting %s at adminstrative request\n",
1531                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1532                 class_fail_export(doomed_exp);
1533                 class_export_put(doomed_exp);
1534                 exports_evicted++;
1535         }
1536         cfs_hash_putref(uuid_hash);
1537
1538         return exports_evicted;
1539 }
1540 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1541
1542 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1543 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1544 EXPORT_SYMBOL(class_export_dump_hook);
1545 #endif
1546
1547 static void print_export_data(struct obd_export *exp, const char *status,
1548                               int locks)
1549 {
1550         struct ptlrpc_reply_state *rs;
1551         struct ptlrpc_reply_state *first_reply = NULL;
1552         int nreplies = 0;
1553
1554         spin_lock(&exp->exp_lock);
1555         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1556                             rs_exp_list) {
1557                 if (nreplies == 0)
1558                         first_reply = rs;
1559                 nreplies++;
1560         }
1561         spin_unlock(&exp->exp_lock);
1562
1563         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1564                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1565                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1566                atomic_read(&exp->exp_rpc_count),
1567                atomic_read(&exp->exp_cb_count),
1568                atomic_read(&exp->exp_locks_count),
1569                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1570                nreplies, first_reply, nreplies > 3 ? "..." : "",
1571                exp->exp_last_committed);
1572 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1573         if (locks && class_export_dump_hook != NULL)
1574                 class_export_dump_hook(exp);
1575 #endif
1576 }
1577
1578 void dump_exports(struct obd_device *obd, int locks)
1579 {
1580         struct obd_export *exp;
1581
1582         spin_lock(&obd->obd_dev_lock);
1583         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1584                 print_export_data(exp, "ACTIVE", locks);
1585         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1586                 print_export_data(exp, "UNLINKED", locks);
1587         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1588                 print_export_data(exp, "DELAYED", locks);
1589         spin_unlock(&obd->obd_dev_lock);
1590         spin_lock(&obd_zombie_impexp_lock);
1591         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1592                 print_export_data(exp, "ZOMBIE", locks);
1593         spin_unlock(&obd_zombie_impexp_lock);
1594 }
1595 EXPORT_SYMBOL(dump_exports);
1596
1597 void obd_exports_barrier(struct obd_device *obd)
1598 {
1599         int waited = 2;
1600         LASSERT(list_empty(&obd->obd_exports));
1601         spin_lock(&obd->obd_dev_lock);
1602         while (!list_empty(&obd->obd_unlinked_exports)) {
1603                 spin_unlock(&obd->obd_dev_lock);
1604                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1605                                                    cfs_time_seconds(waited));
1606                 if (waited > 5 && IS_PO2(waited)) {
1607                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1608                                       "more than %d seconds. "
1609                                       "The obd refcount = %d. Is it stuck?\n",
1610                                       obd->obd_name, waited,
1611                                       atomic_read(&obd->obd_refcount));
1612                         dump_exports(obd, 1);
1613                 }
1614                 waited *= 2;
1615                 spin_lock(&obd->obd_dev_lock);
1616         }
1617         spin_unlock(&obd->obd_dev_lock);
1618 }
1619 EXPORT_SYMBOL(obd_exports_barrier);
1620
1621 /* Total amount of zombies to be destroyed */
1622 static int zombies_count = 0;
1623
1624 /**
1625  * kill zombie imports and exports
1626  */
1627 void obd_zombie_impexp_cull(void)
1628 {
1629         struct obd_import *import;
1630         struct obd_export *export;
1631         ENTRY;
1632
1633         do {
1634                 spin_lock(&obd_zombie_impexp_lock);
1635
1636                 import = NULL;
1637                 if (!list_empty(&obd_zombie_imports)) {
1638                         import = list_entry(obd_zombie_imports.next,
1639                                             struct obd_import,
1640                                             imp_zombie_chain);
1641                         list_del_init(&import->imp_zombie_chain);
1642                 }
1643
1644                 export = NULL;
1645                 if (!list_empty(&obd_zombie_exports)) {
1646                         export = list_entry(obd_zombie_exports.next,
1647                                             struct obd_export,
1648                                             exp_obd_chain);
1649                         list_del_init(&export->exp_obd_chain);
1650                 }
1651
1652                 spin_unlock(&obd_zombie_impexp_lock);
1653
1654                 if (import != NULL) {
1655                         class_import_destroy(import);
1656                         spin_lock(&obd_zombie_impexp_lock);
1657                         zombies_count--;
1658                         spin_unlock(&obd_zombie_impexp_lock);
1659                 }
1660
1661                 if (export != NULL) {
1662                         class_export_destroy(export);
1663                         spin_lock(&obd_zombie_impexp_lock);
1664                         zombies_count--;
1665                         spin_unlock(&obd_zombie_impexp_lock);
1666                 }
1667
1668                 cond_resched();
1669         } while (import != NULL || export != NULL);
1670         EXIT;
1671 }
1672
1673 static struct completion        obd_zombie_start;
1674 static struct completion        obd_zombie_stop;
1675 static unsigned long            obd_zombie_flags;
1676 static wait_queue_head_t        obd_zombie_waitq;
1677 static pid_t                    obd_zombie_pid;
1678
1679 enum {
1680         OBD_ZOMBIE_STOP         = 0x0001,
1681 };
1682
1683 /**
1684  * check for work for kill zombie import/export thread.
1685  */
1686 static int obd_zombie_impexp_check(void *arg)
1687 {
1688         int rc;
1689
1690         spin_lock(&obd_zombie_impexp_lock);
1691         rc = (zombies_count == 0) &&
1692              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1693         spin_unlock(&obd_zombie_impexp_lock);
1694
1695         RETURN(rc);
1696 }
1697
1698 /**
1699  * Add export to the obd_zombe thread and notify it.
1700  */
1701 static void obd_zombie_export_add(struct obd_export *exp) {
1702         spin_lock(&exp->exp_obd->obd_dev_lock);
1703         LASSERT(!list_empty(&exp->exp_obd_chain));
1704         list_del_init(&exp->exp_obd_chain);
1705         spin_unlock(&exp->exp_obd->obd_dev_lock);
1706         spin_lock(&obd_zombie_impexp_lock);
1707         zombies_count++;
1708         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1709         spin_unlock(&obd_zombie_impexp_lock);
1710
1711         obd_zombie_impexp_notify();
1712 }
1713
1714 /**
1715  * Add import to the obd_zombe thread and notify it.
1716  */
1717 static void obd_zombie_import_add(struct obd_import *imp) {
1718         LASSERT(imp->imp_sec == NULL);
1719         LASSERT(imp->imp_rq_pool == NULL);
1720         spin_lock(&obd_zombie_impexp_lock);
1721         LASSERT(list_empty(&imp->imp_zombie_chain));
1722         zombies_count++;
1723         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1724         spin_unlock(&obd_zombie_impexp_lock);
1725
1726         obd_zombie_impexp_notify();
1727 }
1728
1729 /**
1730  * notify import/export destroy thread about new zombie.
1731  */
1732 static void obd_zombie_impexp_notify(void)
1733 {
1734         /*
1735          * Make sure obd_zomebie_impexp_thread get this notification.
1736          * It is possible this signal only get by obd_zombie_barrier, and
1737          * barrier gulps this notification and sleeps away and hangs ensues
1738          */
1739         wake_up_all(&obd_zombie_waitq);
1740 }
1741
1742 /**
1743  * check whether obd_zombie is idle
1744  */
1745 static int obd_zombie_is_idle(void)
1746 {
1747         int rc;
1748
1749         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1750         spin_lock(&obd_zombie_impexp_lock);
1751         rc = (zombies_count == 0);
1752         spin_unlock(&obd_zombie_impexp_lock);
1753         return rc;
1754 }
1755
1756 /**
1757  * wait when obd_zombie import/export queues become empty
1758  */
1759 void obd_zombie_barrier(void)
1760 {
1761         struct l_wait_info lwi = { 0 };
1762
1763         if (obd_zombie_pid == current_pid())
1764                 /* don't wait for myself */
1765                 return;
1766         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1767 }
1768 EXPORT_SYMBOL(obd_zombie_barrier);
1769
1770 #ifdef __KERNEL__
1771
1772 /**
1773  * destroy zombie export/import thread.
1774  */
1775 static int obd_zombie_impexp_thread(void *unused)
1776 {
1777         unshare_fs_struct();
1778         complete(&obd_zombie_start);
1779
1780         obd_zombie_pid = current_pid();
1781
1782         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1783                 struct l_wait_info lwi = { 0 };
1784
1785                 l_wait_event(obd_zombie_waitq,
1786                              !obd_zombie_impexp_check(NULL), &lwi);
1787                 obd_zombie_impexp_cull();
1788
1789                 /*
1790                  * Notify obd_zombie_barrier callers that queues
1791                  * may be empty.
1792                  */
1793                 wake_up(&obd_zombie_waitq);
1794         }
1795
1796         complete(&obd_zombie_stop);
1797
1798         RETURN(0);
1799 }
1800
1801 #else /* ! KERNEL */
1802
1803 static atomic_t zombie_recur = ATOMIC_INIT(0);
1804 static void *obd_zombie_impexp_work_cb;
1805 static void *obd_zombie_impexp_idle_cb;
1806
1807 int obd_zombie_impexp_kill(void *arg)
1808 {
1809         int rc = 0;
1810
1811         if (atomic_inc_return(&zombie_recur) == 1) {
1812                 obd_zombie_impexp_cull();
1813                 rc = 1;
1814         }
1815         atomic_dec(&zombie_recur);
1816         return rc;
1817 }
1818
1819 #endif
1820
1821 /**
1822  * start destroy zombie import/export thread
1823  */
1824 int obd_zombie_impexp_init(void)
1825 {
1826 #ifdef __KERNEL__
1827         struct task_struct *task;
1828 #endif
1829
1830         INIT_LIST_HEAD(&obd_zombie_imports);
1831
1832         INIT_LIST_HEAD(&obd_zombie_exports);
1833         spin_lock_init(&obd_zombie_impexp_lock);
1834         init_completion(&obd_zombie_start);
1835         init_completion(&obd_zombie_stop);
1836         init_waitqueue_head(&obd_zombie_waitq);
1837         obd_zombie_pid = 0;
1838
1839 #ifdef __KERNEL__
1840         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1841         if (IS_ERR(task))
1842                 RETURN(PTR_ERR(task));
1843
1844         wait_for_completion(&obd_zombie_start);
1845 #else
1846
1847         obd_zombie_impexp_work_cb =
1848                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1849                                                  &obd_zombie_impexp_kill, NULL);
1850
1851         obd_zombie_impexp_idle_cb =
1852                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1853                                                  &obd_zombie_impexp_check, NULL);
1854 #endif
1855         RETURN(0);
1856 }
1857 /**
1858  * stop destroy zombie import/export thread
1859  */
1860 void obd_zombie_impexp_stop(void)
1861 {
1862         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1863         obd_zombie_impexp_notify();
1864 #ifdef __KERNEL__
1865         wait_for_completion(&obd_zombie_stop);
1866 #else
1867         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1868         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1869 #endif
1870 }
1871
1872 /***** Kernel-userspace comm helpers *******/
1873
1874 /* Get length of entire message, including header */
1875 int kuc_len(int payload_len)
1876 {
1877         return sizeof(struct kuc_hdr) + payload_len;
1878 }
1879 EXPORT_SYMBOL(kuc_len);
1880
1881 /* Get a pointer to kuc header, given a ptr to the payload
1882  * @param p Pointer to payload area
1883  * @returns Pointer to kuc header
1884  */
1885 struct kuc_hdr * kuc_ptr(void *p)
1886 {
1887         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1888         LASSERT(lh->kuc_magic == KUC_MAGIC);
1889         return lh;
1890 }
1891 EXPORT_SYMBOL(kuc_ptr);
1892
1893 /* Test if payload is part of kuc message
1894  * @param p Pointer to payload area
1895  * @returns boolean
1896  */
1897 int kuc_ispayload(void *p)
1898 {
1899         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1900
1901         if (kh->kuc_magic == KUC_MAGIC)
1902                 return 1;
1903         else
1904                 return 0;
1905 }
1906 EXPORT_SYMBOL(kuc_ispayload);
1907
1908 /* Alloc space for a message, and fill in header
1909  * @return Pointer to payload area
1910  */
1911 void *kuc_alloc(int payload_len, int transport, int type)
1912 {
1913         struct kuc_hdr *lh;
1914         int len = kuc_len(payload_len);
1915
1916         OBD_ALLOC(lh, len);
1917         if (lh == NULL)
1918                 return ERR_PTR(-ENOMEM);
1919
1920         lh->kuc_magic = KUC_MAGIC;
1921         lh->kuc_transport = transport;
1922         lh->kuc_msgtype = type;
1923         lh->kuc_msglen = len;
1924
1925         return (void *)(lh + 1);
1926 }
1927 EXPORT_SYMBOL(kuc_alloc);
1928
1929 /* Takes pointer to payload area */
1930 inline void kuc_free(void *p, int payload_len)
1931 {
1932         struct kuc_hdr *lh = kuc_ptr(p);
1933         OBD_FREE(lh, kuc_len(payload_len));
1934 }
1935 EXPORT_SYMBOL(kuc_free);
1936
1937 struct obd_request_slot_waiter {
1938         struct list_head        orsw_entry;
1939         wait_queue_head_t       orsw_waitq;
1940         bool                    orsw_signaled;
1941 };
1942
1943 static bool obd_request_slot_avail(struct client_obd *cli,
1944                                    struct obd_request_slot_waiter *orsw)
1945 {
1946         bool avail;
1947
1948         client_obd_list_lock(&cli->cl_loi_list_lock);
1949         avail = !!list_empty(&orsw->orsw_entry);
1950         client_obd_list_unlock(&cli->cl_loi_list_lock);
1951
1952         return avail;
1953 };
1954
1955 /*
1956  * For network flow control, the RPC sponsor needs to acquire a credit
1957  * before sending the RPC. The credits count for a connection is defined
1958  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1959  * the subsequent RPC sponsors need to wait until others released their
1960  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1961  */
1962 int obd_get_request_slot(struct client_obd *cli)
1963 {
1964         struct obd_request_slot_waiter   orsw;
1965         struct l_wait_info               lwi;
1966         int                              rc;
1967
1968         client_obd_list_lock(&cli->cl_loi_list_lock);
1969         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1970                 cli->cl_r_in_flight++;
1971                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1972                 return 0;
1973         }
1974
1975         init_waitqueue_head(&orsw.orsw_waitq);
1976         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1977         orsw.orsw_signaled = false;
1978         client_obd_list_unlock(&cli->cl_loi_list_lock);
1979
1980         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1981         rc = l_wait_event(orsw.orsw_waitq,
1982                           obd_request_slot_avail(cli, &orsw) ||
1983                           orsw.orsw_signaled,
1984                           &lwi);
1985
1986         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1987          * freed but other (such as obd_put_request_slot) is using it. */
1988         client_obd_list_lock(&cli->cl_loi_list_lock);
1989         if (rc != 0) {
1990                 if (!orsw.orsw_signaled) {
1991                         if (list_empty(&orsw.orsw_entry))
1992                                 cli->cl_r_in_flight--;
1993                         else
1994                                 list_del(&orsw.orsw_entry);
1995                 }
1996         }
1997
1998         if (orsw.orsw_signaled) {
1999                 LASSERT(list_empty(&orsw.orsw_entry));
2000
2001                 rc = -EINTR;
2002         }
2003         client_obd_list_unlock(&cli->cl_loi_list_lock);
2004
2005         return rc;
2006 }
2007 EXPORT_SYMBOL(obd_get_request_slot);
2008
2009 void obd_put_request_slot(struct client_obd *cli)
2010 {
2011         struct obd_request_slot_waiter *orsw;
2012
2013         client_obd_list_lock(&cli->cl_loi_list_lock);
2014         cli->cl_r_in_flight--;
2015
2016         /* If there is free slot, wakeup the first waiter. */
2017         if (!list_empty(&cli->cl_loi_read_list) &&
2018             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2019                 orsw = list_entry(cli->cl_loi_read_list.next,
2020                                   struct obd_request_slot_waiter, orsw_entry);
2021                 list_del_init(&orsw->orsw_entry);
2022                 cli->cl_r_in_flight++;
2023                 wake_up(&orsw->orsw_waitq);
2024         }
2025         client_obd_list_unlock(&cli->cl_loi_list_lock);
2026 }
2027 EXPORT_SYMBOL(obd_put_request_slot);
2028
2029 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2030 {
2031         return cli->cl_max_rpcs_in_flight;
2032 }
2033 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2034
2035 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2036 {
2037         struct obd_request_slot_waiter *orsw;
2038         __u32                           old;
2039         int                             diff;
2040         int                             i;
2041
2042         if (max > OBD_MAX_RIF_MAX || max < 1)
2043                 return -ERANGE;
2044
2045         client_obd_list_lock(&cli->cl_loi_list_lock);
2046         old = cli->cl_max_rpcs_in_flight;
2047         cli->cl_max_rpcs_in_flight = max;
2048         diff = max - old;
2049
2050         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2051         for (i = 0; i < diff; i++) {
2052                 if (list_empty(&cli->cl_loi_read_list))
2053                         break;
2054
2055                 orsw = list_entry(cli->cl_loi_read_list.next,
2056                                   struct obd_request_slot_waiter, orsw_entry);
2057                 list_del_init(&orsw->orsw_entry);
2058                 cli->cl_r_in_flight++;
2059                 wake_up(&orsw->orsw_waitq);
2060         }
2061         client_obd_list_unlock(&cli->cl_loi_list_lock);
2062
2063         return 0;
2064 }
2065 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);