Whamcloud - gitweb
LU-3105 mdc: remove capa support
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
48
49 spinlock_t obd_types_lock;
50
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
55
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t  obd_zombie_impexp_lock;
59
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114 EXPORT_SYMBOL(class_search_type);
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123
124                 if (strcmp(modname, "obdfilter") == 0)
125                         modname = "ofd";
126
127                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128                         modname = LUSTRE_OSP_NAME;
129
130                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131                         modname = LUSTRE_MDT_NAME;
132
133                 if (!request_module("%s", modname)) {
134                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135                         type = class_search_type(name);
136                 } else {
137                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138                                            modname);
139                 }
140         }
141 #endif
142         if (type) {
143                 spin_lock(&type->obd_type_lock);
144                 type->typ_refcnt++;
145                 try_module_get(type->typ_dt_ops->o_owner);
146                 spin_unlock(&type->obd_type_lock);
147         }
148         return type;
149 }
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         spin_lock(&type->obd_type_lock);
155         type->typ_refcnt--;
156         module_put(type->typ_dt_ops->o_owner);
157         spin_unlock(&type->obd_type_lock);
158 }
159
160 #define CLASS_MAX_NAME 1024
161
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163                         bool enable_proc, struct lprocfs_vars *vars,
164                         const char *name, struct lu_device_type *ldt)
165 {
166         struct obd_type *type;
167         int rc = 0;
168         ENTRY;
169
170         /* sanity check */
171         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172
173         if (class_search_type(name)) {
174                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175                 RETURN(-EEXIST);
176         }
177
178         rc = -ENOMEM;
179         OBD_ALLOC(type, sizeof(*type));
180         if (type == NULL)
181                 RETURN(rc);
182
183         OBD_ALLOC_PTR(type->typ_dt_ops);
184         OBD_ALLOC_PTR(type->typ_md_ops);
185         OBD_ALLOC(type->typ_name, strlen(name) + 1);
186
187         if (type->typ_dt_ops == NULL ||
188             type->typ_md_ops == NULL ||
189             type->typ_name == NULL)
190                 GOTO (failed, rc);
191
192         *(type->typ_dt_ops) = *dt_ops;
193         /* md_ops is optional */
194         if (md_ops)
195                 *(type->typ_md_ops) = *md_ops;
196         strcpy(type->typ_name, name);
197         spin_lock_init(&type->obd_type_lock);
198
199 #ifdef CONFIG_PROC_FS
200         if (enable_proc) {
201                 type->typ_procroot = lprocfs_register(type->typ_name,
202                                                       proc_lustre_root,
203                                                       vars, type);
204                 if (IS_ERR(type->typ_procroot)) {
205                         rc = PTR_ERR(type->typ_procroot);
206                         type->typ_procroot = NULL;
207                         GOTO(failed, rc);
208                 }
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224 failed:
225         if (type->typ_name != NULL) {
226 #ifdef CONFIG_PROC_FS
227                 if (type->typ_procroot != NULL)
228                         remove_proc_subtree(type->typ_name, proc_lustre_root);
229 #endif
230                 OBD_FREE(type->typ_name, strlen(name) + 1);
231         }
232         if (type->typ_md_ops != NULL)
233                 OBD_FREE_PTR(type->typ_md_ops);
234         if (type->typ_dt_ops != NULL)
235                 OBD_FREE_PTR(type->typ_dt_ops);
236         OBD_FREE(type, sizeof(*type));
237         RETURN(rc);
238 }
239 EXPORT_SYMBOL(class_register_type);
240
241 int class_unregister_type(const char *name)
242 {
243         struct obd_type *type = class_search_type(name);
244         ENTRY;
245
246         if (!type) {
247                 CERROR("unknown obd type\n");
248                 RETURN(-EINVAL);
249         }
250
251         if (type->typ_refcnt) {
252                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
253                 /* This is a bad situation, let's make the best of it */
254                 /* Remove ops, but leave the name for debugging */
255                 OBD_FREE_PTR(type->typ_dt_ops);
256                 OBD_FREE_PTR(type->typ_md_ops);
257                 RETURN(-EBUSY);
258         }
259
260         /* we do not use type->typ_procroot as for compatibility purposes
261          * other modules can share names (i.e. lod can use lov entry). so
262          * we can't reference pointer as it can get invalided when another
263          * module removes the entry */
264 #ifdef CONFIG_PROC_FS
265         if (type->typ_procroot != NULL)
266                 remove_proc_subtree(type->typ_name, proc_lustre_root);
267         if (type->typ_procsym != NULL)
268                 lprocfs_remove(&type->typ_procsym);
269 #endif
270         if (type->typ_lu)
271                 lu_device_type_fini(type->typ_lu);
272
273         spin_lock(&obd_types_lock);
274         list_del(&type->typ_chain);
275         spin_unlock(&obd_types_lock);
276         OBD_FREE(type->typ_name, strlen(name) + 1);
277         if (type->typ_dt_ops != NULL)
278                 OBD_FREE_PTR(type->typ_dt_ops);
279         if (type->typ_md_ops != NULL)
280                 OBD_FREE_PTR(type->typ_md_ops);
281         OBD_FREE(type, sizeof(*type));
282         RETURN(0);
283 } /* class_unregister_type */
284 EXPORT_SYMBOL(class_unregister_type);
285
286 /**
287  * Create a new obd device.
288  *
289  * Find an empty slot in ::obd_devs[], create a new obd device in it.
290  *
291  * \param[in] type_name obd device type string.
292  * \param[in] name      obd device name.
293  *
294  * \retval NULL if create fails, otherwise return the obd device
295  *         pointer created.
296  */
297 struct obd_device *class_newdev(const char *type_name, const char *name)
298 {
299         struct obd_device *result = NULL;
300         struct obd_device *newdev;
301         struct obd_type *type = NULL;
302         int i;
303         int new_obd_minor = 0;
304         ENTRY;
305
306         if (strlen(name) >= MAX_OBD_NAME) {
307                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
308                 RETURN(ERR_PTR(-EINVAL));
309         }
310
311         type = class_get_type(type_name);
312         if (type == NULL){
313                 CERROR("OBD: unknown type: %s\n", type_name);
314                 RETURN(ERR_PTR(-ENODEV));
315         }
316
317         newdev = obd_device_alloc();
318         if (newdev == NULL)
319                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
320
321         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
322
323         write_lock(&obd_dev_lock);
324         for (i = 0; i < class_devno_max(); i++) {
325                 struct obd_device *obd = class_num2obd(i);
326
327                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
328                         CERROR("Device %s already exists at %d, won't add\n",
329                                name, i);
330                         if (result) {
331                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
332                                          "%p obd_magic %08x != %08x\n", result,
333                                          result->obd_magic, OBD_DEVICE_MAGIC);
334                                 LASSERTF(result->obd_minor == new_obd_minor,
335                                          "%p obd_minor %d != %d\n", result,
336                                          result->obd_minor, new_obd_minor);
337
338                                 obd_devs[result->obd_minor] = NULL;
339                                 result->obd_name[0]='\0';
340                          }
341                         result = ERR_PTR(-EEXIST);
342                         break;
343                 }
344                 if (!result && !obd) {
345                         result = newdev;
346                         result->obd_minor = i;
347                         new_obd_minor = i;
348                         result->obd_type = type;
349                         strncpy(result->obd_name, name,
350                                 sizeof(result->obd_name) - 1);
351                         obd_devs[i] = result;
352                 }
353         }
354         write_unlock(&obd_dev_lock);
355
356         if (result == NULL && i >= class_devno_max()) {
357                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
358                        class_devno_max());
359                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
360         }
361
362         if (IS_ERR(result))
363                 GOTO(out, result);
364
365         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
366                result->obd_name, result);
367
368         RETURN(result);
369 out:
370         obd_device_free(newdev);
371 out_type:
372         class_put_type(type);
373         return result;
374 }
375
376 void class_release_dev(struct obd_device *obd)
377 {
378         struct obd_type *obd_type = obd->obd_type;
379
380         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
381                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
382         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
383                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
384         LASSERT(obd_type != NULL);
385
386         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
387                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
388
389         write_lock(&obd_dev_lock);
390         obd_devs[obd->obd_minor] = NULL;
391         write_unlock(&obd_dev_lock);
392         obd_device_free(obd);
393
394         class_put_type(obd_type);
395 }
396
397 int class_name2dev(const char *name)
398 {
399         int i;
400
401         if (!name)
402                 return -1;
403
404         read_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407
408                 if (obd && strcmp(name, obd->obd_name) == 0) {
409                         /* Make sure we finished attaching before we give
410                            out any references */
411                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412                         if (obd->obd_attached) {
413                                 read_unlock(&obd_dev_lock);
414                                 return i;
415                         }
416                         break;
417                 }
418         }
419         read_unlock(&obd_dev_lock);
420
421         return -1;
422 }
423
424 struct obd_device *class_name2obd(const char *name)
425 {
426         int dev = class_name2dev(name);
427
428         if (dev < 0 || dev > class_devno_max())
429                 return NULL;
430         return class_num2obd(dev);
431 }
432 EXPORT_SYMBOL(class_name2obd);
433
434 int class_uuid2dev(struct obd_uuid *uuid)
435 {
436         int i;
437
438         read_lock(&obd_dev_lock);
439         for (i = 0; i < class_devno_max(); i++) {
440                 struct obd_device *obd = class_num2obd(i);
441
442                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444                         read_unlock(&obd_dev_lock);
445                         return i;
446                 }
447         }
448         read_unlock(&obd_dev_lock);
449
450         return -1;
451 }
452
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
454 {
455         int dev = class_uuid2dev(uuid);
456         if (dev < 0)
457                 return NULL;
458         return class_num2obd(dev);
459 }
460 EXPORT_SYMBOL(class_uuid2obd);
461
462 /**
463  * Get obd device from ::obd_devs[]
464  *
465  * \param num [in] array index
466  *
467  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468  *         otherwise return the obd device there.
469  */
470 struct obd_device *class_num2obd(int num)
471 {
472         struct obd_device *obd = NULL;
473
474         if (num < class_devno_max()) {
475                 obd = obd_devs[num];
476                 if (obd == NULL)
477                         return NULL;
478
479                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480                          "%p obd_magic %08x != %08x\n",
481                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482                 LASSERTF(obd->obd_minor == num,
483                          "%p obd_minor %0d != %0d\n",
484                          obd, obd->obd_minor, num);
485         }
486
487         return obd;
488 }
489
490 /**
491  * Get obd devices count. Device in any
492  *    state are counted
493  * \retval obd device count
494  */
495 int get_devices_count(void)
496 {
497         int index, max_index = class_devno_max(), dev_count = 0;
498
499         read_lock(&obd_dev_lock);
500         for (index = 0; index <= max_index; index++) {
501                 struct obd_device *obd = class_num2obd(index);
502                 if (obd != NULL)
503                         dev_count++;
504         }
505         read_unlock(&obd_dev_lock);
506
507         return dev_count;
508 }
509 EXPORT_SYMBOL(get_devices_count);
510
511 void class_obd_list(void)
512 {
513         char *status;
514         int i;
515
516         read_lock(&obd_dev_lock);
517         for (i = 0; i < class_devno_max(); i++) {
518                 struct obd_device *obd = class_num2obd(i);
519
520                 if (obd == NULL)
521                         continue;
522                 if (obd->obd_stopping)
523                         status = "ST";
524                 else if (obd->obd_set_up)
525                         status = "UP";
526                 else if (obd->obd_attached)
527                         status = "AT";
528                 else
529                         status = "--";
530                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
531                          i, status, obd->obd_type->typ_name,
532                          obd->obd_name, obd->obd_uuid.uuid,
533                          atomic_read(&obd->obd_refcount));
534         }
535         read_unlock(&obd_dev_lock);
536         return;
537 }
538
539 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
540    specified, then only the client with that uuid is returned,
541    otherwise any client connected to the tgt is returned. */
542 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
543                                           const char * typ_name,
544                                           struct obd_uuid *grp_uuid)
545 {
546         int i;
547
548         read_lock(&obd_dev_lock);
549         for (i = 0; i < class_devno_max(); i++) {
550                 struct obd_device *obd = class_num2obd(i);
551
552                 if (obd == NULL)
553                         continue;
554                 if ((strncmp(obd->obd_type->typ_name, typ_name,
555                              strlen(typ_name)) == 0)) {
556                         if (obd_uuid_equals(tgt_uuid,
557                                             &obd->u.cli.cl_target_uuid) &&
558                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
559                                                          &obd->obd_uuid) : 1)) {
560                                 read_unlock(&obd_dev_lock);
561                                 return obd;
562                         }
563                 }
564         }
565         read_unlock(&obd_dev_lock);
566
567         return NULL;
568 }
569 EXPORT_SYMBOL(class_find_client_obd);
570
571 /* Iterate the obd_device list looking devices have grp_uuid. Start
572    searching at *next, and if a device is found, the next index to look
573    at is saved in *next. If next is NULL, then the first matching device
574    will always be returned. */
575 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
576 {
577         int i;
578
579         if (next == NULL)
580                 i = 0;
581         else if (*next >= 0 && *next < class_devno_max())
582                 i = *next;
583         else
584                 return NULL;
585
586         read_lock(&obd_dev_lock);
587         for (; i < class_devno_max(); i++) {
588                 struct obd_device *obd = class_num2obd(i);
589
590                 if (obd == NULL)
591                         continue;
592                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
593                         if (next != NULL)
594                                 *next = i+1;
595                         read_unlock(&obd_dev_lock);
596                         return obd;
597                 }
598         }
599         read_unlock(&obd_dev_lock);
600
601         return NULL;
602 }
603 EXPORT_SYMBOL(class_devices_in_group);
604
605 /**
606  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
607  * adjust sptlrpc settings accordingly.
608  */
609 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
610 {
611         struct obd_device  *obd;
612         const char         *type;
613         int                 i, rc = 0, rc2;
614
615         LASSERT(namelen > 0);
616
617         read_lock(&obd_dev_lock);
618         for (i = 0; i < class_devno_max(); i++) {
619                 obd = class_num2obd(i);
620
621                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
622                         continue;
623
624                 /* only notify mdc, osc, mdt, ost */
625                 type = obd->obd_type->typ_name;
626                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
627                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
628                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
629                     strcmp(type, LUSTRE_OST_NAME) != 0)
630                         continue;
631
632                 if (strncmp(obd->obd_name, fsname, namelen))
633                         continue;
634
635                 class_incref(obd, __FUNCTION__, obd);
636                 read_unlock(&obd_dev_lock);
637                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
638                                          sizeof(KEY_SPTLRPC_CONF),
639                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
640                 rc = rc ? rc : rc2;
641                 class_decref(obd, __FUNCTION__, obd);
642                 read_lock(&obd_dev_lock);
643         }
644         read_unlock(&obd_dev_lock);
645         return rc;
646 }
647 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
648
649 void obd_cleanup_caches(void)
650 {
651         ENTRY;
652         if (obd_device_cachep) {
653                 kmem_cache_destroy(obd_device_cachep);
654                 obd_device_cachep = NULL;
655         }
656         if (obdo_cachep) {
657                 kmem_cache_destroy(obdo_cachep);
658                 obdo_cachep = NULL;
659         }
660         if (import_cachep) {
661                 kmem_cache_destroy(import_cachep);
662                 import_cachep = NULL;
663         }
664
665         EXIT;
666 }
667
668 int obd_init_caches(void)
669 {
670         int rc;
671         ENTRY;
672
673         LASSERT(obd_device_cachep == NULL);
674         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
675                                               sizeof(struct obd_device),
676                                               0, 0, NULL);
677         if (!obd_device_cachep)
678                 GOTO(out, rc = -ENOMEM);
679
680         LASSERT(obdo_cachep == NULL);
681         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
682                                         0, 0, NULL);
683         if (!obdo_cachep)
684                 GOTO(out, rc = -ENOMEM);
685
686         LASSERT(import_cachep == NULL);
687         import_cachep = kmem_cache_create("ll_import_cache",
688                                           sizeof(struct obd_import),
689                                           0, 0, NULL);
690         if (!import_cachep)
691                 GOTO(out, rc = -ENOMEM);
692
693         RETURN(0);
694 out:
695         obd_cleanup_caches();
696         RETURN(rc);
697 }
698
699 /* map connection to client */
700 struct obd_export *class_conn2export(struct lustre_handle *conn)
701 {
702         struct obd_export *export;
703         ENTRY;
704
705         if (!conn) {
706                 CDEBUG(D_CACHE, "looking for null handle\n");
707                 RETURN(NULL);
708         }
709
710         if (conn->cookie == -1) {  /* this means assign a new connection */
711                 CDEBUG(D_CACHE, "want a new connection\n");
712                 RETURN(NULL);
713         }
714
715         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
716         export = class_handle2object(conn->cookie, NULL);
717         RETURN(export);
718 }
719 EXPORT_SYMBOL(class_conn2export);
720
721 struct obd_device *class_exp2obd(struct obd_export *exp)
722 {
723         if (exp)
724                 return exp->exp_obd;
725         return NULL;
726 }
727 EXPORT_SYMBOL(class_exp2obd);
728
729 struct obd_device *class_conn2obd(struct lustre_handle *conn)
730 {
731         struct obd_export *export;
732         export = class_conn2export(conn);
733         if (export) {
734                 struct obd_device *obd = export->exp_obd;
735                 class_export_put(export);
736                 return obd;
737         }
738         return NULL;
739 }
740
741 struct obd_import *class_exp2cliimp(struct obd_export *exp)
742 {
743         struct obd_device *obd = exp->exp_obd;
744         if (obd == NULL)
745                 return NULL;
746         return obd->u.cli.cl_import;
747 }
748 EXPORT_SYMBOL(class_exp2cliimp);
749
750 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
751 {
752         struct obd_device *obd = class_conn2obd(conn);
753         if (obd == NULL)
754                 return NULL;
755         return obd->u.cli.cl_import;
756 }
757
758 /* Export management functions */
759 static void class_export_destroy(struct obd_export *exp)
760 {
761         struct obd_device *obd = exp->exp_obd;
762         ENTRY;
763
764         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
765         LASSERT(obd != NULL);
766
767         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
768                exp->exp_client_uuid.uuid, obd->obd_name);
769
770         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
771         if (exp->exp_connection)
772                 ptlrpc_put_connection_superhack(exp->exp_connection);
773
774         LASSERT(list_empty(&exp->exp_outstanding_replies));
775         LASSERT(list_empty(&exp->exp_uncommitted_replies));
776         LASSERT(list_empty(&exp->exp_req_replay_queue));
777         LASSERT(list_empty(&exp->exp_hp_rpcs));
778         obd_destroy_export(exp);
779         class_decref(obd, "export", exp);
780
781         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
782         EXIT;
783 }
784
785 static void export_handle_addref(void *export)
786 {
787         class_export_get(export);
788 }
789
790 static struct portals_handle_ops export_handle_ops = {
791         .hop_addref = export_handle_addref,
792         .hop_free   = NULL,
793 };
794
795 struct obd_export *class_export_get(struct obd_export *exp)
796 {
797         atomic_inc(&exp->exp_refcount);
798         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
799                atomic_read(&exp->exp_refcount));
800         return exp;
801 }
802 EXPORT_SYMBOL(class_export_get);
803
804 void class_export_put(struct obd_export *exp)
805 {
806         LASSERT(exp != NULL);
807         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
808         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
809                atomic_read(&exp->exp_refcount) - 1);
810
811         if (atomic_dec_and_test(&exp->exp_refcount)) {
812                 LASSERT(!list_empty(&exp->exp_obd_chain));
813                 CDEBUG(D_IOCTL, "final put %p/%s\n",
814                        exp, exp->exp_client_uuid.uuid);
815
816                 /* release nid stat refererence */
817                 lprocfs_exp_cleanup(exp);
818
819                 obd_zombie_export_add(exp);
820         }
821 }
822 EXPORT_SYMBOL(class_export_put);
823
824 /* Creates a new export, adds it to the hash table, and returns a
825  * pointer to it. The refcount is 2: one for the hash reference, and
826  * one for the pointer returned by this function. */
827 struct obd_export *class_new_export(struct obd_device *obd,
828                                     struct obd_uuid *cluuid)
829 {
830         struct obd_export *export;
831         struct cfs_hash *hash = NULL;
832         int rc = 0;
833         ENTRY;
834
835         OBD_ALLOC_PTR(export);
836         if (!export)
837                 return ERR_PTR(-ENOMEM);
838
839         export->exp_conn_cnt = 0;
840         export->exp_lock_hash = NULL;
841         export->exp_flock_hash = NULL;
842         atomic_set(&export->exp_refcount, 2);
843         atomic_set(&export->exp_rpc_count, 0);
844         atomic_set(&export->exp_cb_count, 0);
845         atomic_set(&export->exp_locks_count, 0);
846 #if LUSTRE_TRACKS_LOCK_EXP_REFS
847         INIT_LIST_HEAD(&export->exp_locks_list);
848         spin_lock_init(&export->exp_locks_list_guard);
849 #endif
850         atomic_set(&export->exp_replay_count, 0);
851         export->exp_obd = obd;
852         INIT_LIST_HEAD(&export->exp_outstanding_replies);
853         spin_lock_init(&export->exp_uncommitted_replies_lock);
854         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
855         INIT_LIST_HEAD(&export->exp_req_replay_queue);
856         INIT_LIST_HEAD(&export->exp_handle.h_link);
857         INIT_LIST_HEAD(&export->exp_hp_rpcs);
858         INIT_LIST_HEAD(&export->exp_reg_rpcs);
859         class_handle_hash(&export->exp_handle, &export_handle_ops);
860         export->exp_last_request_time = cfs_time_current_sec();
861         spin_lock_init(&export->exp_lock);
862         spin_lock_init(&export->exp_rpc_lock);
863         INIT_HLIST_NODE(&export->exp_uuid_hash);
864         INIT_HLIST_NODE(&export->exp_nid_hash);
865         spin_lock_init(&export->exp_bl_list_lock);
866         INIT_LIST_HEAD(&export->exp_bl_list);
867
868         export->exp_sp_peer = LUSTRE_SP_ANY;
869         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
870         export->exp_client_uuid = *cluuid;
871         obd_init_export(export);
872
873         spin_lock(&obd->obd_dev_lock);
874         /* shouldn't happen, but might race */
875         if (obd->obd_stopping)
876                 GOTO(exit_unlock, rc = -ENODEV);
877
878         hash = cfs_hash_getref(obd->obd_uuid_hash);
879         if (hash == NULL)
880                 GOTO(exit_unlock, rc = -ENODEV);
881         spin_unlock(&obd->obd_dev_lock);
882
883         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
884                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
885                 if (rc != 0) {
886                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
887                                       obd->obd_name, cluuid->uuid, rc);
888                         GOTO(exit_err, rc = -EALREADY);
889                 }
890         }
891
892         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
893         spin_lock(&obd->obd_dev_lock);
894         if (obd->obd_stopping) {
895                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
896                 GOTO(exit_unlock, rc = -ENODEV);
897         }
898
899         class_incref(obd, "export", export);
900         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
901         list_add_tail(&export->exp_obd_chain_timed,
902                       &export->exp_obd->obd_exports_timed);
903         export->exp_obd->obd_num_exports++;
904         spin_unlock(&obd->obd_dev_lock);
905         cfs_hash_putref(hash);
906         RETURN(export);
907
908 exit_unlock:
909         spin_unlock(&obd->obd_dev_lock);
910 exit_err:
911         if (hash)
912                 cfs_hash_putref(hash);
913         class_handle_unhash(&export->exp_handle);
914         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
915         obd_destroy_export(export);
916         OBD_FREE_PTR(export);
917         return ERR_PTR(rc);
918 }
919 EXPORT_SYMBOL(class_new_export);
920
921 void class_unlink_export(struct obd_export *exp)
922 {
923         class_handle_unhash(&exp->exp_handle);
924
925         spin_lock(&exp->exp_obd->obd_dev_lock);
926         /* delete an uuid-export hashitem from hashtables */
927         if (!hlist_unhashed(&exp->exp_uuid_hash))
928                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
929                              &exp->exp_client_uuid,
930                              &exp->exp_uuid_hash);
931
932         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
933         list_del_init(&exp->exp_obd_chain_timed);
934         exp->exp_obd->obd_num_exports--;
935         spin_unlock(&exp->exp_obd->obd_dev_lock);
936         class_export_put(exp);
937 }
938
939 /* Import management functions */
940 static void class_import_destroy(struct obd_import *imp)
941 {
942         ENTRY;
943
944         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
945                 imp->imp_obd->obd_name);
946
947         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
948
949         ptlrpc_put_connection_superhack(imp->imp_connection);
950
951         while (!list_empty(&imp->imp_conn_list)) {
952                 struct obd_import_conn *imp_conn;
953
954                 imp_conn = list_entry(imp->imp_conn_list.next,
955                                       struct obd_import_conn, oic_item);
956                 list_del_init(&imp_conn->oic_item);
957                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
958                 OBD_FREE(imp_conn, sizeof(*imp_conn));
959         }
960
961         LASSERT(imp->imp_sec == NULL);
962         class_decref(imp->imp_obd, "import", imp);
963         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
964         EXIT;
965 }
966
967 static void import_handle_addref(void *import)
968 {
969         class_import_get(import);
970 }
971
972 static struct portals_handle_ops import_handle_ops = {
973         .hop_addref = import_handle_addref,
974         .hop_free   = NULL,
975 };
976
977 struct obd_import *class_import_get(struct obd_import *import)
978 {
979         atomic_inc(&import->imp_refcount);
980         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
981                atomic_read(&import->imp_refcount),
982                import->imp_obd->obd_name);
983         return import;
984 }
985 EXPORT_SYMBOL(class_import_get);
986
987 void class_import_put(struct obd_import *imp)
988 {
989         ENTRY;
990
991         LASSERT(list_empty(&imp->imp_zombie_chain));
992         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
993
994         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
995                atomic_read(&imp->imp_refcount) - 1,
996                imp->imp_obd->obd_name);
997
998         if (atomic_dec_and_test(&imp->imp_refcount)) {
999                 CDEBUG(D_INFO, "final put import %p\n", imp);
1000                 obd_zombie_import_add(imp);
1001         }
1002
1003         /* catch possible import put race */
1004         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1005         EXIT;
1006 }
1007 EXPORT_SYMBOL(class_import_put);
1008
1009 static void init_imp_at(struct imp_at *at) {
1010         int i;
1011         at_init(&at->iat_net_latency, 0, 0);
1012         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1013                 /* max service estimates are tracked on the server side, so
1014                    don't use the AT history here, just use the last reported
1015                    val. (But keep hist for proc histogram, worst_ever) */
1016                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1017                         AT_FLG_NOHIST);
1018         }
1019 }
1020
1021 struct obd_import *class_new_import(struct obd_device *obd)
1022 {
1023         struct obd_import *imp;
1024
1025         OBD_ALLOC(imp, sizeof(*imp));
1026         if (imp == NULL)
1027                 return NULL;
1028
1029         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1030         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1031         INIT_LIST_HEAD(&imp->imp_replay_list);
1032         INIT_LIST_HEAD(&imp->imp_sending_list);
1033         INIT_LIST_HEAD(&imp->imp_delayed_list);
1034         INIT_LIST_HEAD(&imp->imp_committed_list);
1035         imp->imp_replay_cursor = &imp->imp_committed_list;
1036         spin_lock_init(&imp->imp_lock);
1037         imp->imp_last_success_conn = 0;
1038         imp->imp_state = LUSTRE_IMP_NEW;
1039         imp->imp_obd = class_incref(obd, "import", imp);
1040         mutex_init(&imp->imp_sec_mutex);
1041         init_waitqueue_head(&imp->imp_recovery_waitq);
1042
1043         atomic_set(&imp->imp_refcount, 2);
1044         atomic_set(&imp->imp_unregistering, 0);
1045         atomic_set(&imp->imp_inflight, 0);
1046         atomic_set(&imp->imp_replay_inflight, 0);
1047         atomic_set(&imp->imp_inval_count, 0);
1048         INIT_LIST_HEAD(&imp->imp_conn_list);
1049         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1050         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1051         init_imp_at(&imp->imp_at);
1052
1053         /* the default magic is V2, will be used in connect RPC, and
1054          * then adjusted according to the flags in request/reply. */
1055         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1056
1057         return imp;
1058 }
1059 EXPORT_SYMBOL(class_new_import);
1060
1061 void class_destroy_import(struct obd_import *import)
1062 {
1063         LASSERT(import != NULL);
1064         LASSERT(import != LP_POISON);
1065
1066         class_handle_unhash(&import->imp_handle);
1067
1068         spin_lock(&import->imp_lock);
1069         import->imp_generation++;
1070         spin_unlock(&import->imp_lock);
1071         class_import_put(import);
1072 }
1073 EXPORT_SYMBOL(class_destroy_import);
1074
1075 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1076
1077 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1078 {
1079         spin_lock(&exp->exp_locks_list_guard);
1080
1081         LASSERT(lock->l_exp_refs_nr >= 0);
1082
1083         if (lock->l_exp_refs_target != NULL &&
1084             lock->l_exp_refs_target != exp) {
1085                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1086                               exp, lock, lock->l_exp_refs_target);
1087         }
1088         if ((lock->l_exp_refs_nr ++) == 0) {
1089                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1090                 lock->l_exp_refs_target = exp;
1091         }
1092         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1093                lock, exp, lock->l_exp_refs_nr);
1094         spin_unlock(&exp->exp_locks_list_guard);
1095 }
1096
1097 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1098 {
1099         spin_lock(&exp->exp_locks_list_guard);
1100         LASSERT(lock->l_exp_refs_nr > 0);
1101         if (lock->l_exp_refs_target != exp) {
1102                 LCONSOLE_WARN("lock %p, "
1103                               "mismatching export pointers: %p, %p\n",
1104                               lock, lock->l_exp_refs_target, exp);
1105         }
1106         if (-- lock->l_exp_refs_nr == 0) {
1107                 list_del_init(&lock->l_exp_refs_link);
1108                 lock->l_exp_refs_target = NULL;
1109         }
1110         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1111                lock, exp, lock->l_exp_refs_nr);
1112         spin_unlock(&exp->exp_locks_list_guard);
1113 }
1114 #endif
1115
1116 /* A connection defines an export context in which preallocation can
1117    be managed. This releases the export pointer reference, and returns
1118    the export handle, so the export refcount is 1 when this function
1119    returns. */
1120 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1121                   struct obd_uuid *cluuid)
1122 {
1123         struct obd_export *export;
1124         LASSERT(conn != NULL);
1125         LASSERT(obd != NULL);
1126         LASSERT(cluuid != NULL);
1127         ENTRY;
1128
1129         export = class_new_export(obd, cluuid);
1130         if (IS_ERR(export))
1131                 RETURN(PTR_ERR(export));
1132
1133         conn->cookie = export->exp_handle.h_cookie;
1134         class_export_put(export);
1135
1136         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1137                cluuid->uuid, conn->cookie);
1138         RETURN(0);
1139 }
1140 EXPORT_SYMBOL(class_connect);
1141
1142 /* if export is involved in recovery then clean up related things */
1143 static void class_export_recovery_cleanup(struct obd_export *exp)
1144 {
1145         struct obd_device *obd = exp->exp_obd;
1146
1147         spin_lock(&obd->obd_recovery_task_lock);
1148         if (obd->obd_recovering) {
1149                 if (exp->exp_in_recovery) {
1150                         spin_lock(&exp->exp_lock);
1151                         exp->exp_in_recovery = 0;
1152                         spin_unlock(&exp->exp_lock);
1153                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1154                         atomic_dec(&obd->obd_connected_clients);
1155                 }
1156
1157                 /* if called during recovery then should update
1158                  * obd_stale_clients counter,
1159                  * lightweight exports are not counted */
1160                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1161                         exp->exp_obd->obd_stale_clients++;
1162         }
1163         spin_unlock(&obd->obd_recovery_task_lock);
1164
1165         spin_lock(&exp->exp_lock);
1166         /** Cleanup req replay fields */
1167         if (exp->exp_req_replay_needed) {
1168                 exp->exp_req_replay_needed = 0;
1169
1170                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1171                 atomic_dec(&obd->obd_req_replay_clients);
1172         }
1173
1174         /** Cleanup lock replay data */
1175         if (exp->exp_lock_replay_needed) {
1176                 exp->exp_lock_replay_needed = 0;
1177
1178                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1179                 atomic_dec(&obd->obd_lock_replay_clients);
1180         }
1181         spin_unlock(&exp->exp_lock);
1182 }
1183
1184 /* This function removes 1-3 references from the export:
1185  * 1 - for export pointer passed
1186  * and if disconnect really need
1187  * 2 - removing from hash
1188  * 3 - in client_unlink_export
1189  * The export pointer passed to this function can destroyed */
1190 int class_disconnect(struct obd_export *export)
1191 {
1192         int already_disconnected;
1193         ENTRY;
1194
1195         if (export == NULL) {
1196                 CWARN("attempting to free NULL export %p\n", export);
1197                 RETURN(-EINVAL);
1198         }
1199
1200         spin_lock(&export->exp_lock);
1201         already_disconnected = export->exp_disconnected;
1202         export->exp_disconnected = 1;
1203         spin_unlock(&export->exp_lock);
1204
1205         /* class_cleanup(), abort_recovery(), and class_fail_export()
1206          * all end up in here, and if any of them race we shouldn't
1207          * call extra class_export_puts(). */
1208         if (already_disconnected) {
1209                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1210                 GOTO(no_disconn, already_disconnected);
1211         }
1212
1213         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1214                export->exp_handle.h_cookie);
1215
1216         if (!hlist_unhashed(&export->exp_nid_hash))
1217                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1218                              &export->exp_connection->c_peer.nid,
1219                              &export->exp_nid_hash);
1220
1221         class_export_recovery_cleanup(export);
1222         class_unlink_export(export);
1223 no_disconn:
1224         class_export_put(export);
1225         RETURN(0);
1226 }
1227 EXPORT_SYMBOL(class_disconnect);
1228
1229 /* Return non-zero for a fully connected export */
1230 int class_connected_export(struct obd_export *exp)
1231 {
1232         int connected = 0;
1233
1234         if (exp) {
1235                 spin_lock(&exp->exp_lock);
1236                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1237                 spin_unlock(&exp->exp_lock);
1238         }
1239         return connected;
1240 }
1241 EXPORT_SYMBOL(class_connected_export);
1242
1243 static void class_disconnect_export_list(struct list_head *list,
1244                                          enum obd_option flags)
1245 {
1246         int rc;
1247         struct obd_export *exp;
1248         ENTRY;
1249
1250         /* It's possible that an export may disconnect itself, but
1251          * nothing else will be added to this list. */
1252         while (!list_empty(list)) {
1253                 exp = list_entry(list->next, struct obd_export,
1254                                  exp_obd_chain);
1255                 /* need for safe call CDEBUG after obd_disconnect */
1256                 class_export_get(exp);
1257
1258                 spin_lock(&exp->exp_lock);
1259                 exp->exp_flags = flags;
1260                 spin_unlock(&exp->exp_lock);
1261
1262                 if (obd_uuid_equals(&exp->exp_client_uuid,
1263                                     &exp->exp_obd->obd_uuid)) {
1264                         CDEBUG(D_HA,
1265                                "exp %p export uuid == obd uuid, don't discon\n",
1266                                exp);
1267                         /* Need to delete this now so we don't end up pointing
1268                          * to work_list later when this export is cleaned up. */
1269                         list_del_init(&exp->exp_obd_chain);
1270                         class_export_put(exp);
1271                         continue;
1272                 }
1273
1274                 class_export_get(exp);
1275                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1276                        "last request at "CFS_TIME_T"\n",
1277                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1278                        exp, exp->exp_last_request_time);
1279                 /* release one export reference anyway */
1280                 rc = obd_disconnect(exp);
1281
1282                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1283                        obd_export_nid2str(exp), exp, rc);
1284                 class_export_put(exp);
1285         }
1286         EXIT;
1287 }
1288
1289 void class_disconnect_exports(struct obd_device *obd)
1290 {
1291         struct list_head work_list;
1292         ENTRY;
1293
1294         /* Move all of the exports from obd_exports to a work list, en masse. */
1295         INIT_LIST_HEAD(&work_list);
1296         spin_lock(&obd->obd_dev_lock);
1297         list_splice_init(&obd->obd_exports, &work_list);
1298         list_splice_init(&obd->obd_delayed_exports, &work_list);
1299         spin_unlock(&obd->obd_dev_lock);
1300
1301         if (!list_empty(&work_list)) {
1302                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1303                        "disconnecting them\n", obd->obd_minor, obd);
1304                 class_disconnect_export_list(&work_list,
1305                                              exp_flags_from_obd(obd));
1306         } else
1307                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1308                        obd->obd_minor, obd);
1309         EXIT;
1310 }
1311 EXPORT_SYMBOL(class_disconnect_exports);
1312
1313 /* Remove exports that have not completed recovery.
1314  */
1315 void class_disconnect_stale_exports(struct obd_device *obd,
1316                                     int (*test_export)(struct obd_export *))
1317 {
1318         struct list_head work_list;
1319         struct obd_export *exp, *n;
1320         int evicted = 0;
1321         ENTRY;
1322
1323         INIT_LIST_HEAD(&work_list);
1324         spin_lock(&obd->obd_dev_lock);
1325         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1326                                  exp_obd_chain) {
1327                 /* don't count self-export as client */
1328                 if (obd_uuid_equals(&exp->exp_client_uuid,
1329                                     &exp->exp_obd->obd_uuid))
1330                         continue;
1331
1332                 /* don't evict clients which have no slot in last_rcvd
1333                  * (e.g. lightweight connection) */
1334                 if (exp->exp_target_data.ted_lr_idx == -1)
1335                         continue;
1336
1337                 spin_lock(&exp->exp_lock);
1338                 if (exp->exp_failed || test_export(exp)) {
1339                         spin_unlock(&exp->exp_lock);
1340                         continue;
1341                 }
1342                 exp->exp_failed = 1;
1343                 spin_unlock(&exp->exp_lock);
1344
1345                 list_move(&exp->exp_obd_chain, &work_list);
1346                 evicted++;
1347                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1348                        obd->obd_name, exp->exp_client_uuid.uuid,
1349                        exp->exp_connection == NULL ? "<unknown>" :
1350                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1351                 print_export_data(exp, "EVICTING", 0);
1352         }
1353         spin_unlock(&obd->obd_dev_lock);
1354
1355         if (evicted)
1356                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1357                               obd->obd_name, evicted);
1358
1359         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1360                                                  OBD_OPT_ABORT_RECOV);
1361         EXIT;
1362 }
1363 EXPORT_SYMBOL(class_disconnect_stale_exports);
1364
1365 void class_fail_export(struct obd_export *exp)
1366 {
1367         int rc, already_failed;
1368
1369         spin_lock(&exp->exp_lock);
1370         already_failed = exp->exp_failed;
1371         exp->exp_failed = 1;
1372         spin_unlock(&exp->exp_lock);
1373
1374         if (already_failed) {
1375                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1376                        exp, exp->exp_client_uuid.uuid);
1377                 return;
1378         }
1379
1380         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1381                exp, exp->exp_client_uuid.uuid);
1382
1383         if (obd_dump_on_timeout)
1384                 libcfs_debug_dumplog();
1385
1386         /* need for safe call CDEBUG after obd_disconnect */
1387         class_export_get(exp);
1388
1389         /* Most callers into obd_disconnect are removing their own reference
1390          * (request, for example) in addition to the one from the hash table.
1391          * We don't have such a reference here, so make one. */
1392         class_export_get(exp);
1393         rc = obd_disconnect(exp);
1394         if (rc)
1395                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1396         else
1397                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1398                        exp, exp->exp_client_uuid.uuid);
1399         class_export_put(exp);
1400 }
1401 EXPORT_SYMBOL(class_fail_export);
1402
1403 char *obd_export_nid2str(struct obd_export *exp)
1404 {
1405         if (exp->exp_connection != NULL)
1406                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1407
1408         return "(no nid)";
1409 }
1410 EXPORT_SYMBOL(obd_export_nid2str);
1411
1412 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1413 {
1414         struct cfs_hash *nid_hash;
1415         struct obd_export *doomed_exp = NULL;
1416         int exports_evicted = 0;
1417
1418         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1419
1420         spin_lock(&obd->obd_dev_lock);
1421         /* umount has run already, so evict thread should leave
1422          * its task to umount thread now */
1423         if (obd->obd_stopping) {
1424                 spin_unlock(&obd->obd_dev_lock);
1425                 return exports_evicted;
1426         }
1427         nid_hash = obd->obd_nid_hash;
1428         cfs_hash_getref(nid_hash);
1429         spin_unlock(&obd->obd_dev_lock);
1430
1431         do {
1432                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1433                 if (doomed_exp == NULL)
1434                         break;
1435
1436                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1437                          "nid %s found, wanted nid %s, requested nid %s\n",
1438                          obd_export_nid2str(doomed_exp),
1439                          libcfs_nid2str(nid_key), nid);
1440                 LASSERTF(doomed_exp != obd->obd_self_export,
1441                          "self-export is hashed by NID?\n");
1442                 exports_evicted++;
1443                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1444                               "request\n", obd->obd_name,
1445                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1446                               obd_export_nid2str(doomed_exp));
1447                 class_fail_export(doomed_exp);
1448                 class_export_put(doomed_exp);
1449         } while (1);
1450
1451         cfs_hash_putref(nid_hash);
1452
1453         if (!exports_evicted)
1454                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1455                        obd->obd_name, nid);
1456         return exports_evicted;
1457 }
1458 EXPORT_SYMBOL(obd_export_evict_by_nid);
1459
1460 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1461 {
1462         struct cfs_hash *uuid_hash;
1463         struct obd_export *doomed_exp = NULL;
1464         struct obd_uuid doomed_uuid;
1465         int exports_evicted = 0;
1466
1467         spin_lock(&obd->obd_dev_lock);
1468         if (obd->obd_stopping) {
1469                 spin_unlock(&obd->obd_dev_lock);
1470                 return exports_evicted;
1471         }
1472         uuid_hash = obd->obd_uuid_hash;
1473         cfs_hash_getref(uuid_hash);
1474         spin_unlock(&obd->obd_dev_lock);
1475
1476         obd_str2uuid(&doomed_uuid, uuid);
1477         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1478                 CERROR("%s: can't evict myself\n", obd->obd_name);
1479                 cfs_hash_putref(uuid_hash);
1480                 return exports_evicted;
1481         }
1482
1483         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1484
1485         if (doomed_exp == NULL) {
1486                 CERROR("%s: can't disconnect %s: no exports found\n",
1487                        obd->obd_name, uuid);
1488         } else {
1489                 CWARN("%s: evicting %s at adminstrative request\n",
1490                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1491                 class_fail_export(doomed_exp);
1492                 class_export_put(doomed_exp);
1493                 exports_evicted++;
1494         }
1495         cfs_hash_putref(uuid_hash);
1496
1497         return exports_evicted;
1498 }
1499
1500 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1501 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1502 #endif
1503
1504 static void print_export_data(struct obd_export *exp, const char *status,
1505                               int locks)
1506 {
1507         struct ptlrpc_reply_state *rs;
1508         struct ptlrpc_reply_state *first_reply = NULL;
1509         int nreplies = 0;
1510
1511         spin_lock(&exp->exp_lock);
1512         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1513                             rs_exp_list) {
1514                 if (nreplies == 0)
1515                         first_reply = rs;
1516                 nreplies++;
1517         }
1518         spin_unlock(&exp->exp_lock);
1519
1520         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1521                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1522                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1523                atomic_read(&exp->exp_rpc_count),
1524                atomic_read(&exp->exp_cb_count),
1525                atomic_read(&exp->exp_locks_count),
1526                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1527                nreplies, first_reply, nreplies > 3 ? "..." : "",
1528                exp->exp_last_committed);
1529 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1530         if (locks && class_export_dump_hook != NULL)
1531                 class_export_dump_hook(exp);
1532 #endif
1533 }
1534
1535 void dump_exports(struct obd_device *obd, int locks)
1536 {
1537         struct obd_export *exp;
1538
1539         spin_lock(&obd->obd_dev_lock);
1540         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1541                 print_export_data(exp, "ACTIVE", locks);
1542         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1543                 print_export_data(exp, "UNLINKED", locks);
1544         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1545                 print_export_data(exp, "DELAYED", locks);
1546         spin_unlock(&obd->obd_dev_lock);
1547         spin_lock(&obd_zombie_impexp_lock);
1548         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1549                 print_export_data(exp, "ZOMBIE", locks);
1550         spin_unlock(&obd_zombie_impexp_lock);
1551 }
1552
1553 void obd_exports_barrier(struct obd_device *obd)
1554 {
1555         int waited = 2;
1556         LASSERT(list_empty(&obd->obd_exports));
1557         spin_lock(&obd->obd_dev_lock);
1558         while (!list_empty(&obd->obd_unlinked_exports)) {
1559                 spin_unlock(&obd->obd_dev_lock);
1560                 set_current_state(TASK_UNINTERRUPTIBLE);
1561                 schedule_timeout(cfs_time_seconds(waited));
1562                 if (waited > 5 && IS_PO2(waited)) {
1563                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1564                                       "more than %d seconds. "
1565                                       "The obd refcount = %d. Is it stuck?\n",
1566                                       obd->obd_name, waited,
1567                                       atomic_read(&obd->obd_refcount));
1568                         dump_exports(obd, 1);
1569                 }
1570                 waited *= 2;
1571                 spin_lock(&obd->obd_dev_lock);
1572         }
1573         spin_unlock(&obd->obd_dev_lock);
1574 }
1575 EXPORT_SYMBOL(obd_exports_barrier);
1576
1577 /* Total amount of zombies to be destroyed */
1578 static int zombies_count = 0;
1579
1580 /**
1581  * kill zombie imports and exports
1582  */
1583 void obd_zombie_impexp_cull(void)
1584 {
1585         struct obd_import *import;
1586         struct obd_export *export;
1587         ENTRY;
1588
1589         do {
1590                 spin_lock(&obd_zombie_impexp_lock);
1591
1592                 import = NULL;
1593                 if (!list_empty(&obd_zombie_imports)) {
1594                         import = list_entry(obd_zombie_imports.next,
1595                                             struct obd_import,
1596                                             imp_zombie_chain);
1597                         list_del_init(&import->imp_zombie_chain);
1598                 }
1599
1600                 export = NULL;
1601                 if (!list_empty(&obd_zombie_exports)) {
1602                         export = list_entry(obd_zombie_exports.next,
1603                                             struct obd_export,
1604                                             exp_obd_chain);
1605                         list_del_init(&export->exp_obd_chain);
1606                 }
1607
1608                 spin_unlock(&obd_zombie_impexp_lock);
1609
1610                 if (import != NULL) {
1611                         class_import_destroy(import);
1612                         spin_lock(&obd_zombie_impexp_lock);
1613                         zombies_count--;
1614                         spin_unlock(&obd_zombie_impexp_lock);
1615                 }
1616
1617                 if (export != NULL) {
1618                         class_export_destroy(export);
1619                         spin_lock(&obd_zombie_impexp_lock);
1620                         zombies_count--;
1621                         spin_unlock(&obd_zombie_impexp_lock);
1622                 }
1623
1624                 cond_resched();
1625         } while (import != NULL || export != NULL);
1626         EXIT;
1627 }
1628
1629 static struct completion        obd_zombie_start;
1630 static struct completion        obd_zombie_stop;
1631 static unsigned long            obd_zombie_flags;
1632 static wait_queue_head_t        obd_zombie_waitq;
1633 static pid_t                    obd_zombie_pid;
1634
1635 enum {
1636         OBD_ZOMBIE_STOP         = 0x0001,
1637 };
1638
1639 /**
1640  * check for work for kill zombie import/export thread.
1641  */
1642 static int obd_zombie_impexp_check(void *arg)
1643 {
1644         int rc;
1645
1646         spin_lock(&obd_zombie_impexp_lock);
1647         rc = (zombies_count == 0) &&
1648              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1649         spin_unlock(&obd_zombie_impexp_lock);
1650
1651         RETURN(rc);
1652 }
1653
1654 /**
1655  * Add export to the obd_zombe thread and notify it.
1656  */
1657 static void obd_zombie_export_add(struct obd_export *exp) {
1658         spin_lock(&exp->exp_obd->obd_dev_lock);
1659         LASSERT(!list_empty(&exp->exp_obd_chain));
1660         list_del_init(&exp->exp_obd_chain);
1661         spin_unlock(&exp->exp_obd->obd_dev_lock);
1662         spin_lock(&obd_zombie_impexp_lock);
1663         zombies_count++;
1664         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1665         spin_unlock(&obd_zombie_impexp_lock);
1666
1667         obd_zombie_impexp_notify();
1668 }
1669
1670 /**
1671  * Add import to the obd_zombe thread and notify it.
1672  */
1673 static void obd_zombie_import_add(struct obd_import *imp) {
1674         LASSERT(imp->imp_sec == NULL);
1675         LASSERT(imp->imp_rq_pool == NULL);
1676         spin_lock(&obd_zombie_impexp_lock);
1677         LASSERT(list_empty(&imp->imp_zombie_chain));
1678         zombies_count++;
1679         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1680         spin_unlock(&obd_zombie_impexp_lock);
1681
1682         obd_zombie_impexp_notify();
1683 }
1684
1685 /**
1686  * notify import/export destroy thread about new zombie.
1687  */
1688 static void obd_zombie_impexp_notify(void)
1689 {
1690         /*
1691          * Make sure obd_zomebie_impexp_thread get this notification.
1692          * It is possible this signal only get by obd_zombie_barrier, and
1693          * barrier gulps this notification and sleeps away and hangs ensues
1694          */
1695         wake_up_all(&obd_zombie_waitq);
1696 }
1697
1698 /**
1699  * check whether obd_zombie is idle
1700  */
1701 static int obd_zombie_is_idle(void)
1702 {
1703         int rc;
1704
1705         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1706         spin_lock(&obd_zombie_impexp_lock);
1707         rc = (zombies_count == 0);
1708         spin_unlock(&obd_zombie_impexp_lock);
1709         return rc;
1710 }
1711
1712 /**
1713  * wait when obd_zombie import/export queues become empty
1714  */
1715 void obd_zombie_barrier(void)
1716 {
1717         struct l_wait_info lwi = { 0 };
1718
1719         if (obd_zombie_pid == current_pid())
1720                 /* don't wait for myself */
1721                 return;
1722         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1723 }
1724 EXPORT_SYMBOL(obd_zombie_barrier);
1725
1726
1727 /**
1728  * destroy zombie export/import thread.
1729  */
1730 static int obd_zombie_impexp_thread(void *unused)
1731 {
1732         unshare_fs_struct();
1733         complete(&obd_zombie_start);
1734
1735         obd_zombie_pid = current_pid();
1736
1737         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1738                 struct l_wait_info lwi = { 0 };
1739
1740                 l_wait_event(obd_zombie_waitq,
1741                              !obd_zombie_impexp_check(NULL), &lwi);
1742                 obd_zombie_impexp_cull();
1743
1744                 /*
1745                  * Notify obd_zombie_barrier callers that queues
1746                  * may be empty.
1747                  */
1748                 wake_up(&obd_zombie_waitq);
1749         }
1750
1751         complete(&obd_zombie_stop);
1752
1753         RETURN(0);
1754 }
1755
1756
1757 /**
1758  * start destroy zombie import/export thread
1759  */
1760 int obd_zombie_impexp_init(void)
1761 {
1762         struct task_struct *task;
1763
1764         INIT_LIST_HEAD(&obd_zombie_imports);
1765
1766         INIT_LIST_HEAD(&obd_zombie_exports);
1767         spin_lock_init(&obd_zombie_impexp_lock);
1768         init_completion(&obd_zombie_start);
1769         init_completion(&obd_zombie_stop);
1770         init_waitqueue_head(&obd_zombie_waitq);
1771         obd_zombie_pid = 0;
1772
1773         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1774         if (IS_ERR(task))
1775                 RETURN(PTR_ERR(task));
1776
1777         wait_for_completion(&obd_zombie_start);
1778         RETURN(0);
1779 }
1780 /**
1781  * stop destroy zombie import/export thread
1782  */
1783 void obd_zombie_impexp_stop(void)
1784 {
1785         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1786         obd_zombie_impexp_notify();
1787         wait_for_completion(&obd_zombie_stop);
1788 }
1789
1790 /***** Kernel-userspace comm helpers *******/
1791
1792 /* Get length of entire message, including header */
1793 int kuc_len(int payload_len)
1794 {
1795         return sizeof(struct kuc_hdr) + payload_len;
1796 }
1797 EXPORT_SYMBOL(kuc_len);
1798
1799 /* Get a pointer to kuc header, given a ptr to the payload
1800  * @param p Pointer to payload area
1801  * @returns Pointer to kuc header
1802  */
1803 struct kuc_hdr * kuc_ptr(void *p)
1804 {
1805         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1806         LASSERT(lh->kuc_magic == KUC_MAGIC);
1807         return lh;
1808 }
1809 EXPORT_SYMBOL(kuc_ptr);
1810
1811 /* Test if payload is part of kuc message
1812  * @param p Pointer to payload area
1813  * @returns boolean
1814  */
1815 int kuc_ispayload(void *p)
1816 {
1817         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1818
1819         if (kh->kuc_magic == KUC_MAGIC)
1820                 return 1;
1821         else
1822                 return 0;
1823 }
1824 EXPORT_SYMBOL(kuc_ispayload);
1825
1826 /* Alloc space for a message, and fill in header
1827  * @return Pointer to payload area
1828  */
1829 void *kuc_alloc(int payload_len, int transport, int type)
1830 {
1831         struct kuc_hdr *lh;
1832         int len = kuc_len(payload_len);
1833
1834         OBD_ALLOC(lh, len);
1835         if (lh == NULL)
1836                 return ERR_PTR(-ENOMEM);
1837
1838         lh->kuc_magic = KUC_MAGIC;
1839         lh->kuc_transport = transport;
1840         lh->kuc_msgtype = type;
1841         lh->kuc_msglen = len;
1842
1843         return (void *)(lh + 1);
1844 }
1845 EXPORT_SYMBOL(kuc_alloc);
1846
1847 /* Takes pointer to payload area */
1848 inline void kuc_free(void *p, int payload_len)
1849 {
1850         struct kuc_hdr *lh = kuc_ptr(p);
1851         OBD_FREE(lh, kuc_len(payload_len));
1852 }
1853 EXPORT_SYMBOL(kuc_free);
1854
1855 struct obd_request_slot_waiter {
1856         struct list_head        orsw_entry;
1857         wait_queue_head_t       orsw_waitq;
1858         bool                    orsw_signaled;
1859 };
1860
1861 static bool obd_request_slot_avail(struct client_obd *cli,
1862                                    struct obd_request_slot_waiter *orsw)
1863 {
1864         bool avail;
1865
1866         spin_lock(&cli->cl_loi_list_lock);
1867         avail = !!list_empty(&orsw->orsw_entry);
1868         spin_unlock(&cli->cl_loi_list_lock);
1869
1870         return avail;
1871 };
1872
1873 /*
1874  * For network flow control, the RPC sponsor needs to acquire a credit
1875  * before sending the RPC. The credits count for a connection is defined
1876  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1877  * the subsequent RPC sponsors need to wait until others released their
1878  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1879  */
1880 int obd_get_request_slot(struct client_obd *cli)
1881 {
1882         struct obd_request_slot_waiter   orsw;
1883         struct l_wait_info               lwi;
1884         int                              rc;
1885
1886         spin_lock(&cli->cl_loi_list_lock);
1887         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1888                 cli->cl_r_in_flight++;
1889                 spin_unlock(&cli->cl_loi_list_lock);
1890                 return 0;
1891         }
1892
1893         init_waitqueue_head(&orsw.orsw_waitq);
1894         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1895         orsw.orsw_signaled = false;
1896         spin_unlock(&cli->cl_loi_list_lock);
1897
1898         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1899         rc = l_wait_event(orsw.orsw_waitq,
1900                           obd_request_slot_avail(cli, &orsw) ||
1901                           orsw.orsw_signaled,
1902                           &lwi);
1903
1904         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1905          * freed but other (such as obd_put_request_slot) is using it. */
1906         spin_lock(&cli->cl_loi_list_lock);
1907         if (rc != 0) {
1908                 if (!orsw.orsw_signaled) {
1909                         if (list_empty(&orsw.orsw_entry))
1910                                 cli->cl_r_in_flight--;
1911                         else
1912                                 list_del(&orsw.orsw_entry);
1913                 }
1914         }
1915
1916         if (orsw.orsw_signaled) {
1917                 LASSERT(list_empty(&orsw.orsw_entry));
1918
1919                 rc = -EINTR;
1920         }
1921         spin_unlock(&cli->cl_loi_list_lock);
1922
1923         return rc;
1924 }
1925 EXPORT_SYMBOL(obd_get_request_slot);
1926
1927 void obd_put_request_slot(struct client_obd *cli)
1928 {
1929         struct obd_request_slot_waiter *orsw;
1930
1931         spin_lock(&cli->cl_loi_list_lock);
1932         cli->cl_r_in_flight--;
1933
1934         /* If there is free slot, wakeup the first waiter. */
1935         if (!list_empty(&cli->cl_loi_read_list) &&
1936             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1937                 orsw = list_entry(cli->cl_loi_read_list.next,
1938                                   struct obd_request_slot_waiter, orsw_entry);
1939                 list_del_init(&orsw->orsw_entry);
1940                 cli->cl_r_in_flight++;
1941                 wake_up(&orsw->orsw_waitq);
1942         }
1943         spin_unlock(&cli->cl_loi_list_lock);
1944 }
1945 EXPORT_SYMBOL(obd_put_request_slot);
1946
1947 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1948 {
1949         return cli->cl_max_rpcs_in_flight;
1950 }
1951 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1952
1953 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1954 {
1955         struct obd_request_slot_waiter *orsw;
1956         __u32                           old;
1957         int                             diff;
1958         int                             i;
1959         char                            *typ_name;
1960         int                             rc;
1961
1962         if (max > OBD_MAX_RIF_MAX || max < 1)
1963                 return -ERANGE;
1964
1965         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
1966         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
1967                 /* adjust max_mod_rpcs_in_flight to ensure it is always
1968                  * strictly lower that max_rpcs_in_flight */
1969                 if (max < 2) {
1970                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
1971                                "because it must be higher than "
1972                                "max_mod_rpcs_in_flight value",
1973                                cli->cl_import->imp_obd->obd_name);
1974                         return -ERANGE;
1975                 }
1976                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
1977                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
1978                         if (rc != 0)
1979                                 return rc;
1980                 }
1981         }
1982
1983         spin_lock(&cli->cl_loi_list_lock);
1984         old = cli->cl_max_rpcs_in_flight;
1985         cli->cl_max_rpcs_in_flight = max;
1986         diff = max - old;
1987
1988         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1989         for (i = 0; i < diff; i++) {
1990                 if (list_empty(&cli->cl_loi_read_list))
1991                         break;
1992
1993                 orsw = list_entry(cli->cl_loi_read_list.next,
1994                                   struct obd_request_slot_waiter, orsw_entry);
1995                 list_del_init(&orsw->orsw_entry);
1996                 cli->cl_r_in_flight++;
1997                 wake_up(&orsw->orsw_waitq);
1998         }
1999         spin_unlock(&cli->cl_loi_list_lock);
2000
2001         return 0;
2002 }
2003 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2004
2005 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2006 {
2007         return cli->cl_max_mod_rpcs_in_flight;
2008 }
2009 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2010
2011 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2012 {
2013         struct obd_connect_data *ocd;
2014         __u16 maxmodrpcs;
2015
2016         if (max > OBD_MAX_RIF_MAX || max < 1)
2017                 return -ERANGE;
2018
2019         /* cannot exceed or equal max_rpcs_in_flight */
2020         if (max >= cli->cl_max_rpcs_in_flight) {
2021                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2022                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2023                        cli->cl_import->imp_obd->obd_name,
2024                        max, cli->cl_max_rpcs_in_flight);
2025                 return -ERANGE;
2026         }
2027
2028         /* cannot exceed max modify RPCs in flight supported by the server */
2029         ocd = &cli->cl_import->imp_connect_data;
2030         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2031                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2032         else
2033                 maxmodrpcs = 1;
2034         if (max > maxmodrpcs) {
2035                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2036                        "higher than max_mod_rpcs_per_client value (%hu) "
2037                        "returned by the server at connection\n",
2038                        cli->cl_import->imp_obd->obd_name,
2039                        max, maxmodrpcs);
2040                 return -ERANGE;
2041         }
2042
2043         cli->cl_max_mod_rpcs_in_flight = max;
2044
2045         /* will have to wakeup waiters if max has been increased */
2046
2047         return 0;
2048 }
2049 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2050