Whamcloud - gitweb
LU-6770 osc: use global osc_rq_pool to reduce memory usage
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
48
49 spinlock_t obd_types_lock;
50
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
55
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t  obd_zombie_impexp_lock;
59
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114 EXPORT_SYMBOL(class_search_type);
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123
124                 if (strcmp(modname, "obdfilter") == 0)
125                         modname = "ofd";
126
127                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128                         modname = LUSTRE_OSP_NAME;
129
130                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131                         modname = LUSTRE_MDT_NAME;
132
133                 if (!request_module("%s", modname)) {
134                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135                         type = class_search_type(name);
136                 } else {
137                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138                                            modname);
139                 }
140         }
141 #endif
142         if (type) {
143                 spin_lock(&type->obd_type_lock);
144                 type->typ_refcnt++;
145                 try_module_get(type->typ_dt_ops->o_owner);
146                 spin_unlock(&type->obd_type_lock);
147         }
148         return type;
149 }
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         spin_lock(&type->obd_type_lock);
155         type->typ_refcnt--;
156         module_put(type->typ_dt_ops->o_owner);
157         spin_unlock(&type->obd_type_lock);
158 }
159
160 #define CLASS_MAX_NAME 1024
161
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163                         bool enable_proc, struct lprocfs_vars *vars,
164                         const char *name, struct lu_device_type *ldt)
165 {
166         struct obd_type *type;
167         int rc = 0;
168         ENTRY;
169
170         /* sanity check */
171         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172
173         if (class_search_type(name)) {
174                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175                 RETURN(-EEXIST);
176         }
177
178         rc = -ENOMEM;
179         OBD_ALLOC(type, sizeof(*type));
180         if (type == NULL)
181                 RETURN(rc);
182
183         OBD_ALLOC_PTR(type->typ_dt_ops);
184         OBD_ALLOC_PTR(type->typ_md_ops);
185         OBD_ALLOC(type->typ_name, strlen(name) + 1);
186
187         if (type->typ_dt_ops == NULL ||
188             type->typ_md_ops == NULL ||
189             type->typ_name == NULL)
190                 GOTO (failed, rc);
191
192         *(type->typ_dt_ops) = *dt_ops;
193         /* md_ops is optional */
194         if (md_ops)
195                 *(type->typ_md_ops) = *md_ops;
196         strcpy(type->typ_name, name);
197         spin_lock_init(&type->obd_type_lock);
198
199 #ifdef CONFIG_PROC_FS
200         if (enable_proc) {
201                 type->typ_procroot = lprocfs_register(type->typ_name,
202                                                       proc_lustre_root,
203                                                       vars, type);
204                 if (IS_ERR(type->typ_procroot)) {
205                         rc = PTR_ERR(type->typ_procroot);
206                         type->typ_procroot = NULL;
207                         GOTO(failed, rc);
208                 }
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224 failed:
225         if (type->typ_name != NULL) {
226 #ifdef CONFIG_PROC_FS
227                 if (type->typ_procroot != NULL)
228                         remove_proc_subtree(type->typ_name, proc_lustre_root);
229 #endif
230                 OBD_FREE(type->typ_name, strlen(name) + 1);
231         }
232         if (type->typ_md_ops != NULL)
233                 OBD_FREE_PTR(type->typ_md_ops);
234         if (type->typ_dt_ops != NULL)
235                 OBD_FREE_PTR(type->typ_dt_ops);
236         OBD_FREE(type, sizeof(*type));
237         RETURN(rc);
238 }
239 EXPORT_SYMBOL(class_register_type);
240
241 int class_unregister_type(const char *name)
242 {
243         struct obd_type *type = class_search_type(name);
244         ENTRY;
245
246         if (!type) {
247                 CERROR("unknown obd type\n");
248                 RETURN(-EINVAL);
249         }
250
251         if (type->typ_refcnt) {
252                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
253                 /* This is a bad situation, let's make the best of it */
254                 /* Remove ops, but leave the name for debugging */
255                 OBD_FREE_PTR(type->typ_dt_ops);
256                 OBD_FREE_PTR(type->typ_md_ops);
257                 RETURN(-EBUSY);
258         }
259
260         /* we do not use type->typ_procroot as for compatibility purposes
261          * other modules can share names (i.e. lod can use lov entry). so
262          * we can't reference pointer as it can get invalided when another
263          * module removes the entry */
264 #ifdef CONFIG_PROC_FS
265         if (type->typ_procroot != NULL)
266                 remove_proc_subtree(type->typ_name, proc_lustre_root);
267         if (type->typ_procsym != NULL)
268                 lprocfs_remove(&type->typ_procsym);
269 #endif
270         if (type->typ_lu)
271                 lu_device_type_fini(type->typ_lu);
272
273         spin_lock(&obd_types_lock);
274         list_del(&type->typ_chain);
275         spin_unlock(&obd_types_lock);
276         OBD_FREE(type->typ_name, strlen(name) + 1);
277         if (type->typ_dt_ops != NULL)
278                 OBD_FREE_PTR(type->typ_dt_ops);
279         if (type->typ_md_ops != NULL)
280                 OBD_FREE_PTR(type->typ_md_ops);
281         OBD_FREE(type, sizeof(*type));
282         RETURN(0);
283 } /* class_unregister_type */
284 EXPORT_SYMBOL(class_unregister_type);
285
286 /**
287  * Create a new obd device.
288  *
289  * Find an empty slot in ::obd_devs[], create a new obd device in it.
290  *
291  * \param[in] type_name obd device type string.
292  * \param[in] name      obd device name.
293  *
294  * \retval NULL if create fails, otherwise return the obd device
295  *         pointer created.
296  */
297 struct obd_device *class_newdev(const char *type_name, const char *name)
298 {
299         struct obd_device *result = NULL;
300         struct obd_device *newdev;
301         struct obd_type *type = NULL;
302         int i;
303         int new_obd_minor = 0;
304         ENTRY;
305
306         if (strlen(name) >= MAX_OBD_NAME) {
307                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
308                 RETURN(ERR_PTR(-EINVAL));
309         }
310
311         type = class_get_type(type_name);
312         if (type == NULL){
313                 CERROR("OBD: unknown type: %s\n", type_name);
314                 RETURN(ERR_PTR(-ENODEV));
315         }
316
317         newdev = obd_device_alloc();
318         if (newdev == NULL)
319                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
320
321         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
322
323         write_lock(&obd_dev_lock);
324         for (i = 0; i < class_devno_max(); i++) {
325                 struct obd_device *obd = class_num2obd(i);
326
327                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
328                         CERROR("Device %s already exists at %d, won't add\n",
329                                name, i);
330                         if (result) {
331                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
332                                          "%p obd_magic %08x != %08x\n", result,
333                                          result->obd_magic, OBD_DEVICE_MAGIC);
334                                 LASSERTF(result->obd_minor == new_obd_minor,
335                                          "%p obd_minor %d != %d\n", result,
336                                          result->obd_minor, new_obd_minor);
337
338                                 obd_devs[result->obd_minor] = NULL;
339                                 result->obd_name[0]='\0';
340                          }
341                         result = ERR_PTR(-EEXIST);
342                         break;
343                 }
344                 if (!result && !obd) {
345                         result = newdev;
346                         result->obd_minor = i;
347                         new_obd_minor = i;
348                         result->obd_type = type;
349                         strncpy(result->obd_name, name,
350                                 sizeof(result->obd_name) - 1);
351                         obd_devs[i] = result;
352                 }
353         }
354         write_unlock(&obd_dev_lock);
355
356         if (result == NULL && i >= class_devno_max()) {
357                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
358                        class_devno_max());
359                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
360         }
361
362         if (IS_ERR(result))
363                 GOTO(out, result);
364
365         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
366                result->obd_name, result);
367
368         RETURN(result);
369 out:
370         obd_device_free(newdev);
371 out_type:
372         class_put_type(type);
373         return result;
374 }
375
376 void class_release_dev(struct obd_device *obd)
377 {
378         struct obd_type *obd_type = obd->obd_type;
379
380         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
381                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
382         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
383                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
384         LASSERT(obd_type != NULL);
385
386         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
387                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
388
389         write_lock(&obd_dev_lock);
390         obd_devs[obd->obd_minor] = NULL;
391         write_unlock(&obd_dev_lock);
392         obd_device_free(obd);
393
394         class_put_type(obd_type);
395 }
396
397 int class_name2dev(const char *name)
398 {
399         int i;
400
401         if (!name)
402                 return -1;
403
404         read_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407
408                 if (obd && strcmp(name, obd->obd_name) == 0) {
409                         /* Make sure we finished attaching before we give
410                            out any references */
411                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412                         if (obd->obd_attached) {
413                                 read_unlock(&obd_dev_lock);
414                                 return i;
415                         }
416                         break;
417                 }
418         }
419         read_unlock(&obd_dev_lock);
420
421         return -1;
422 }
423
424 struct obd_device *class_name2obd(const char *name)
425 {
426         int dev = class_name2dev(name);
427
428         if (dev < 0 || dev > class_devno_max())
429                 return NULL;
430         return class_num2obd(dev);
431 }
432 EXPORT_SYMBOL(class_name2obd);
433
434 int class_uuid2dev(struct obd_uuid *uuid)
435 {
436         int i;
437
438         read_lock(&obd_dev_lock);
439         for (i = 0; i < class_devno_max(); i++) {
440                 struct obd_device *obd = class_num2obd(i);
441
442                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444                         read_unlock(&obd_dev_lock);
445                         return i;
446                 }
447         }
448         read_unlock(&obd_dev_lock);
449
450         return -1;
451 }
452
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
454 {
455         int dev = class_uuid2dev(uuid);
456         if (dev < 0)
457                 return NULL;
458         return class_num2obd(dev);
459 }
460 EXPORT_SYMBOL(class_uuid2obd);
461
462 /**
463  * Get obd device from ::obd_devs[]
464  *
465  * \param num [in] array index
466  *
467  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468  *         otherwise return the obd device there.
469  */
470 struct obd_device *class_num2obd(int num)
471 {
472         struct obd_device *obd = NULL;
473
474         if (num < class_devno_max()) {
475                 obd = obd_devs[num];
476                 if (obd == NULL)
477                         return NULL;
478
479                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480                          "%p obd_magic %08x != %08x\n",
481                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482                 LASSERTF(obd->obd_minor == num,
483                          "%p obd_minor %0d != %0d\n",
484                          obd, obd->obd_minor, num);
485         }
486
487         return obd;
488 }
489
490 /**
491  * Get obd devices count. Device in any
492  *    state are counted
493  * \retval obd device count
494  */
495 int get_devices_count(void)
496 {
497         int index, max_index = class_devno_max(), dev_count = 0;
498
499         read_lock(&obd_dev_lock);
500         for (index = 0; index <= max_index; index++) {
501                 struct obd_device *obd = class_num2obd(index);
502                 if (obd != NULL)
503                         dev_count++;
504         }
505         read_unlock(&obd_dev_lock);
506
507         return dev_count;
508 }
509 EXPORT_SYMBOL(get_devices_count);
510
511 void class_obd_list(void)
512 {
513         char *status;
514         int i;
515
516         read_lock(&obd_dev_lock);
517         for (i = 0; i < class_devno_max(); i++) {
518                 struct obd_device *obd = class_num2obd(i);
519
520                 if (obd == NULL)
521                         continue;
522                 if (obd->obd_stopping)
523                         status = "ST";
524                 else if (obd->obd_set_up)
525                         status = "UP";
526                 else if (obd->obd_attached)
527                         status = "AT";
528                 else
529                         status = "--";
530                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
531                          i, status, obd->obd_type->typ_name,
532                          obd->obd_name, obd->obd_uuid.uuid,
533                          atomic_read(&obd->obd_refcount));
534         }
535         read_unlock(&obd_dev_lock);
536         return;
537 }
538
539 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
540    specified, then only the client with that uuid is returned,
541    otherwise any client connected to the tgt is returned. */
542 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
543                                           const char * typ_name,
544                                           struct obd_uuid *grp_uuid)
545 {
546         int i;
547
548         read_lock(&obd_dev_lock);
549         for (i = 0; i < class_devno_max(); i++) {
550                 struct obd_device *obd = class_num2obd(i);
551
552                 if (obd == NULL)
553                         continue;
554                 if ((strncmp(obd->obd_type->typ_name, typ_name,
555                              strlen(typ_name)) == 0)) {
556                         if (obd_uuid_equals(tgt_uuid,
557                                             &obd->u.cli.cl_target_uuid) &&
558                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
559                                                          &obd->obd_uuid) : 1)) {
560                                 read_unlock(&obd_dev_lock);
561                                 return obd;
562                         }
563                 }
564         }
565         read_unlock(&obd_dev_lock);
566
567         return NULL;
568 }
569 EXPORT_SYMBOL(class_find_client_obd);
570
571 /* Iterate the obd_device list looking devices have grp_uuid. Start
572    searching at *next, and if a device is found, the next index to look
573    at is saved in *next. If next is NULL, then the first matching device
574    will always be returned. */
575 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
576 {
577         int i;
578
579         if (next == NULL)
580                 i = 0;
581         else if (*next >= 0 && *next < class_devno_max())
582                 i = *next;
583         else
584                 return NULL;
585
586         read_lock(&obd_dev_lock);
587         for (; i < class_devno_max(); i++) {
588                 struct obd_device *obd = class_num2obd(i);
589
590                 if (obd == NULL)
591                         continue;
592                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
593                         if (next != NULL)
594                                 *next = i+1;
595                         read_unlock(&obd_dev_lock);
596                         return obd;
597                 }
598         }
599         read_unlock(&obd_dev_lock);
600
601         return NULL;
602 }
603 EXPORT_SYMBOL(class_devices_in_group);
604
605 /**
606  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
607  * adjust sptlrpc settings accordingly.
608  */
609 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
610 {
611         struct obd_device  *obd;
612         const char         *type;
613         int                 i, rc = 0, rc2;
614
615         LASSERT(namelen > 0);
616
617         read_lock(&obd_dev_lock);
618         for (i = 0; i < class_devno_max(); i++) {
619                 obd = class_num2obd(i);
620
621                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
622                         continue;
623
624                 /* only notify mdc, osc, mdt, ost */
625                 type = obd->obd_type->typ_name;
626                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
627                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
628                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
629                     strcmp(type, LUSTRE_OST_NAME) != 0)
630                         continue;
631
632                 if (strncmp(obd->obd_name, fsname, namelen))
633                         continue;
634
635                 class_incref(obd, __FUNCTION__, obd);
636                 read_unlock(&obd_dev_lock);
637                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
638                                          sizeof(KEY_SPTLRPC_CONF),
639                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
640                 rc = rc ? rc : rc2;
641                 class_decref(obd, __FUNCTION__, obd);
642                 read_lock(&obd_dev_lock);
643         }
644         read_unlock(&obd_dev_lock);
645         return rc;
646 }
647 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
648
649 void obd_cleanup_caches(void)
650 {
651         ENTRY;
652         if (obd_device_cachep) {
653                 kmem_cache_destroy(obd_device_cachep);
654                 obd_device_cachep = NULL;
655         }
656         if (obdo_cachep) {
657                 kmem_cache_destroy(obdo_cachep);
658                 obdo_cachep = NULL;
659         }
660         if (import_cachep) {
661                 kmem_cache_destroy(import_cachep);
662                 import_cachep = NULL;
663         }
664
665         EXIT;
666 }
667
668 int obd_init_caches(void)
669 {
670         int rc;
671         ENTRY;
672
673         LASSERT(obd_device_cachep == NULL);
674         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
675                                               sizeof(struct obd_device),
676                                               0, 0, NULL);
677         if (!obd_device_cachep)
678                 GOTO(out, rc = -ENOMEM);
679
680         LASSERT(obdo_cachep == NULL);
681         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
682                                         0, 0, NULL);
683         if (!obdo_cachep)
684                 GOTO(out, rc = -ENOMEM);
685
686         LASSERT(import_cachep == NULL);
687         import_cachep = kmem_cache_create("ll_import_cache",
688                                           sizeof(struct obd_import),
689                                           0, 0, NULL);
690         if (!import_cachep)
691                 GOTO(out, rc = -ENOMEM);
692
693         RETURN(0);
694 out:
695         obd_cleanup_caches();
696         RETURN(rc);
697 }
698
699 /* map connection to client */
700 struct obd_export *class_conn2export(struct lustre_handle *conn)
701 {
702         struct obd_export *export;
703         ENTRY;
704
705         if (!conn) {
706                 CDEBUG(D_CACHE, "looking for null handle\n");
707                 RETURN(NULL);
708         }
709
710         if (conn->cookie == -1) {  /* this means assign a new connection */
711                 CDEBUG(D_CACHE, "want a new connection\n");
712                 RETURN(NULL);
713         }
714
715         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
716         export = class_handle2object(conn->cookie, NULL);
717         RETURN(export);
718 }
719 EXPORT_SYMBOL(class_conn2export);
720
721 struct obd_device *class_exp2obd(struct obd_export *exp)
722 {
723         if (exp)
724                 return exp->exp_obd;
725         return NULL;
726 }
727 EXPORT_SYMBOL(class_exp2obd);
728
729 struct obd_device *class_conn2obd(struct lustre_handle *conn)
730 {
731         struct obd_export *export;
732         export = class_conn2export(conn);
733         if (export) {
734                 struct obd_device *obd = export->exp_obd;
735                 class_export_put(export);
736                 return obd;
737         }
738         return NULL;
739 }
740
741 struct obd_import *class_exp2cliimp(struct obd_export *exp)
742 {
743         struct obd_device *obd = exp->exp_obd;
744         if (obd == NULL)
745                 return NULL;
746         return obd->u.cli.cl_import;
747 }
748 EXPORT_SYMBOL(class_exp2cliimp);
749
750 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
751 {
752         struct obd_device *obd = class_conn2obd(conn);
753         if (obd == NULL)
754                 return NULL;
755         return obd->u.cli.cl_import;
756 }
757
758 /* Export management functions */
759 static void class_export_destroy(struct obd_export *exp)
760 {
761         struct obd_device *obd = exp->exp_obd;
762         ENTRY;
763
764         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
765         LASSERT(obd != NULL);
766
767         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
768                exp->exp_client_uuid.uuid, obd->obd_name);
769
770         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
771         if (exp->exp_connection)
772                 ptlrpc_put_connection_superhack(exp->exp_connection);
773
774         LASSERT(list_empty(&exp->exp_outstanding_replies));
775         LASSERT(list_empty(&exp->exp_uncommitted_replies));
776         LASSERT(list_empty(&exp->exp_req_replay_queue));
777         LASSERT(list_empty(&exp->exp_hp_rpcs));
778         obd_destroy_export(exp);
779         class_decref(obd, "export", exp);
780
781         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
782         EXIT;
783 }
784
785 static void export_handle_addref(void *export)
786 {
787         class_export_get(export);
788 }
789
790 static struct portals_handle_ops export_handle_ops = {
791         .hop_addref = export_handle_addref,
792         .hop_free   = NULL,
793 };
794
795 struct obd_export *class_export_get(struct obd_export *exp)
796 {
797         atomic_inc(&exp->exp_refcount);
798         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
799                atomic_read(&exp->exp_refcount));
800         return exp;
801 }
802 EXPORT_SYMBOL(class_export_get);
803
804 void class_export_put(struct obd_export *exp)
805 {
806         LASSERT(exp != NULL);
807         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
808         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
809                atomic_read(&exp->exp_refcount) - 1);
810
811         if (atomic_dec_and_test(&exp->exp_refcount)) {
812                 LASSERT(!list_empty(&exp->exp_obd_chain));
813                 CDEBUG(D_IOCTL, "final put %p/%s\n",
814                        exp, exp->exp_client_uuid.uuid);
815
816                 /* release nid stat refererence */
817                 lprocfs_exp_cleanup(exp);
818
819                 obd_zombie_export_add(exp);
820         }
821 }
822 EXPORT_SYMBOL(class_export_put);
823
824 /* Creates a new export, adds it to the hash table, and returns a
825  * pointer to it. The refcount is 2: one for the hash reference, and
826  * one for the pointer returned by this function. */
827 struct obd_export *class_new_export(struct obd_device *obd,
828                                     struct obd_uuid *cluuid)
829 {
830         struct obd_export *export;
831         struct cfs_hash *hash = NULL;
832         int rc = 0;
833         ENTRY;
834
835         OBD_ALLOC_PTR(export);
836         if (!export)
837                 return ERR_PTR(-ENOMEM);
838
839         export->exp_conn_cnt = 0;
840         export->exp_lock_hash = NULL;
841         export->exp_flock_hash = NULL;
842         atomic_set(&export->exp_refcount, 2);
843         atomic_set(&export->exp_rpc_count, 0);
844         atomic_set(&export->exp_cb_count, 0);
845         atomic_set(&export->exp_locks_count, 0);
846 #if LUSTRE_TRACKS_LOCK_EXP_REFS
847         INIT_LIST_HEAD(&export->exp_locks_list);
848         spin_lock_init(&export->exp_locks_list_guard);
849 #endif
850         atomic_set(&export->exp_replay_count, 0);
851         export->exp_obd = obd;
852         INIT_LIST_HEAD(&export->exp_outstanding_replies);
853         spin_lock_init(&export->exp_uncommitted_replies_lock);
854         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
855         INIT_LIST_HEAD(&export->exp_req_replay_queue);
856         INIT_LIST_HEAD(&export->exp_handle.h_link);
857         INIT_LIST_HEAD(&export->exp_hp_rpcs);
858         INIT_LIST_HEAD(&export->exp_reg_rpcs);
859         class_handle_hash(&export->exp_handle, &export_handle_ops);
860         export->exp_last_request_time = cfs_time_current_sec();
861         spin_lock_init(&export->exp_lock);
862         spin_lock_init(&export->exp_rpc_lock);
863         INIT_HLIST_NODE(&export->exp_uuid_hash);
864         INIT_HLIST_NODE(&export->exp_nid_hash);
865         INIT_HLIST_NODE(&export->exp_gen_hash);
866         spin_lock_init(&export->exp_bl_list_lock);
867         INIT_LIST_HEAD(&export->exp_bl_list);
868
869         export->exp_sp_peer = LUSTRE_SP_ANY;
870         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
871         export->exp_client_uuid = *cluuid;
872         obd_init_export(export);
873
874         spin_lock(&obd->obd_dev_lock);
875         /* shouldn't happen, but might race */
876         if (obd->obd_stopping)
877                 GOTO(exit_unlock, rc = -ENODEV);
878
879         hash = cfs_hash_getref(obd->obd_uuid_hash);
880         if (hash == NULL)
881                 GOTO(exit_unlock, rc = -ENODEV);
882         spin_unlock(&obd->obd_dev_lock);
883
884         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
885                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
886                 if (rc != 0) {
887                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
888                                       obd->obd_name, cluuid->uuid, rc);
889                         GOTO(exit_err, rc = -EALREADY);
890                 }
891         }
892
893         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
894         spin_lock(&obd->obd_dev_lock);
895         if (obd->obd_stopping) {
896                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
897                 GOTO(exit_unlock, rc = -ENODEV);
898         }
899
900         class_incref(obd, "export", export);
901         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
902         list_add_tail(&export->exp_obd_chain_timed,
903                       &export->exp_obd->obd_exports_timed);
904         export->exp_obd->obd_num_exports++;
905         spin_unlock(&obd->obd_dev_lock);
906         cfs_hash_putref(hash);
907         RETURN(export);
908
909 exit_unlock:
910         spin_unlock(&obd->obd_dev_lock);
911 exit_err:
912         if (hash)
913                 cfs_hash_putref(hash);
914         class_handle_unhash(&export->exp_handle);
915         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
916         obd_destroy_export(export);
917         OBD_FREE_PTR(export);
918         return ERR_PTR(rc);
919 }
920 EXPORT_SYMBOL(class_new_export);
921
922 void class_unlink_export(struct obd_export *exp)
923 {
924         class_handle_unhash(&exp->exp_handle);
925
926         spin_lock(&exp->exp_obd->obd_dev_lock);
927         /* delete an uuid-export hashitem from hashtables */
928         if (!hlist_unhashed(&exp->exp_uuid_hash))
929                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
930                              &exp->exp_client_uuid,
931                              &exp->exp_uuid_hash);
932
933         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
934         list_del_init(&exp->exp_obd_chain_timed);
935         exp->exp_obd->obd_num_exports--;
936         spin_unlock(&exp->exp_obd->obd_dev_lock);
937         class_export_put(exp);
938 }
939
940 /* Import management functions */
941 static void class_import_destroy(struct obd_import *imp)
942 {
943         ENTRY;
944
945         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
946                 imp->imp_obd->obd_name);
947
948         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
949
950         ptlrpc_put_connection_superhack(imp->imp_connection);
951
952         while (!list_empty(&imp->imp_conn_list)) {
953                 struct obd_import_conn *imp_conn;
954
955                 imp_conn = list_entry(imp->imp_conn_list.next,
956                                       struct obd_import_conn, oic_item);
957                 list_del_init(&imp_conn->oic_item);
958                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
959                 OBD_FREE(imp_conn, sizeof(*imp_conn));
960         }
961
962         LASSERT(imp->imp_sec == NULL);
963         class_decref(imp->imp_obd, "import", imp);
964         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
965         EXIT;
966 }
967
968 static void import_handle_addref(void *import)
969 {
970         class_import_get(import);
971 }
972
973 static struct portals_handle_ops import_handle_ops = {
974         .hop_addref = import_handle_addref,
975         .hop_free   = NULL,
976 };
977
978 struct obd_import *class_import_get(struct obd_import *import)
979 {
980         atomic_inc(&import->imp_refcount);
981         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
982                atomic_read(&import->imp_refcount),
983                import->imp_obd->obd_name);
984         return import;
985 }
986 EXPORT_SYMBOL(class_import_get);
987
988 void class_import_put(struct obd_import *imp)
989 {
990         ENTRY;
991
992         LASSERT(list_empty(&imp->imp_zombie_chain));
993         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
994
995         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
996                atomic_read(&imp->imp_refcount) - 1,
997                imp->imp_obd->obd_name);
998
999         if (atomic_dec_and_test(&imp->imp_refcount)) {
1000                 CDEBUG(D_INFO, "final put import %p\n", imp);
1001                 obd_zombie_import_add(imp);
1002         }
1003
1004         /* catch possible import put race */
1005         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1006         EXIT;
1007 }
1008 EXPORT_SYMBOL(class_import_put);
1009
1010 static void init_imp_at(struct imp_at *at) {
1011         int i;
1012         at_init(&at->iat_net_latency, 0, 0);
1013         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1014                 /* max service estimates are tracked on the server side, so
1015                    don't use the AT history here, just use the last reported
1016                    val. (But keep hist for proc histogram, worst_ever) */
1017                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1018                         AT_FLG_NOHIST);
1019         }
1020 }
1021
1022 struct obd_import *class_new_import(struct obd_device *obd)
1023 {
1024         struct obd_import *imp;
1025
1026         OBD_ALLOC(imp, sizeof(*imp));
1027         if (imp == NULL)
1028                 return NULL;
1029
1030         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1031         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1032         INIT_LIST_HEAD(&imp->imp_replay_list);
1033         INIT_LIST_HEAD(&imp->imp_sending_list);
1034         INIT_LIST_HEAD(&imp->imp_delayed_list);
1035         INIT_LIST_HEAD(&imp->imp_committed_list);
1036         imp->imp_replay_cursor = &imp->imp_committed_list;
1037         spin_lock_init(&imp->imp_lock);
1038         imp->imp_last_success_conn = 0;
1039         imp->imp_state = LUSTRE_IMP_NEW;
1040         imp->imp_obd = class_incref(obd, "import", imp);
1041         mutex_init(&imp->imp_sec_mutex);
1042         init_waitqueue_head(&imp->imp_recovery_waitq);
1043
1044         atomic_set(&imp->imp_refcount, 2);
1045         atomic_set(&imp->imp_unregistering, 0);
1046         atomic_set(&imp->imp_inflight, 0);
1047         atomic_set(&imp->imp_replay_inflight, 0);
1048         atomic_set(&imp->imp_inval_count, 0);
1049         INIT_LIST_HEAD(&imp->imp_conn_list);
1050         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1051         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1052         init_imp_at(&imp->imp_at);
1053
1054         /* the default magic is V2, will be used in connect RPC, and
1055          * then adjusted according to the flags in request/reply. */
1056         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1057
1058         return imp;
1059 }
1060 EXPORT_SYMBOL(class_new_import);
1061
1062 void class_destroy_import(struct obd_import *import)
1063 {
1064         LASSERT(import != NULL);
1065         LASSERT(import != LP_POISON);
1066
1067         class_handle_unhash(&import->imp_handle);
1068
1069         spin_lock(&import->imp_lock);
1070         import->imp_generation++;
1071         spin_unlock(&import->imp_lock);
1072         class_import_put(import);
1073 }
1074 EXPORT_SYMBOL(class_destroy_import);
1075
1076 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1077
1078 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1079 {
1080         spin_lock(&exp->exp_locks_list_guard);
1081
1082         LASSERT(lock->l_exp_refs_nr >= 0);
1083
1084         if (lock->l_exp_refs_target != NULL &&
1085             lock->l_exp_refs_target != exp) {
1086                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1087                               exp, lock, lock->l_exp_refs_target);
1088         }
1089         if ((lock->l_exp_refs_nr ++) == 0) {
1090                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1091                 lock->l_exp_refs_target = exp;
1092         }
1093         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1094                lock, exp, lock->l_exp_refs_nr);
1095         spin_unlock(&exp->exp_locks_list_guard);
1096 }
1097
1098 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1099 {
1100         spin_lock(&exp->exp_locks_list_guard);
1101         LASSERT(lock->l_exp_refs_nr > 0);
1102         if (lock->l_exp_refs_target != exp) {
1103                 LCONSOLE_WARN("lock %p, "
1104                               "mismatching export pointers: %p, %p\n",
1105                               lock, lock->l_exp_refs_target, exp);
1106         }
1107         if (-- lock->l_exp_refs_nr == 0) {
1108                 list_del_init(&lock->l_exp_refs_link);
1109                 lock->l_exp_refs_target = NULL;
1110         }
1111         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1112                lock, exp, lock->l_exp_refs_nr);
1113         spin_unlock(&exp->exp_locks_list_guard);
1114 }
1115 #endif
1116
1117 /* A connection defines an export context in which preallocation can
1118    be managed. This releases the export pointer reference, and returns
1119    the export handle, so the export refcount is 1 when this function
1120    returns. */
1121 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1122                   struct obd_uuid *cluuid)
1123 {
1124         struct obd_export *export;
1125         LASSERT(conn != NULL);
1126         LASSERT(obd != NULL);
1127         LASSERT(cluuid != NULL);
1128         ENTRY;
1129
1130         export = class_new_export(obd, cluuid);
1131         if (IS_ERR(export))
1132                 RETURN(PTR_ERR(export));
1133
1134         conn->cookie = export->exp_handle.h_cookie;
1135         class_export_put(export);
1136
1137         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1138                cluuid->uuid, conn->cookie);
1139         RETURN(0);
1140 }
1141 EXPORT_SYMBOL(class_connect);
1142
1143 /* if export is involved in recovery then clean up related things */
1144 static void class_export_recovery_cleanup(struct obd_export *exp)
1145 {
1146         struct obd_device *obd = exp->exp_obd;
1147
1148         spin_lock(&obd->obd_recovery_task_lock);
1149         if (obd->obd_recovering) {
1150                 if (exp->exp_in_recovery) {
1151                         spin_lock(&exp->exp_lock);
1152                         exp->exp_in_recovery = 0;
1153                         spin_unlock(&exp->exp_lock);
1154                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1155                         atomic_dec(&obd->obd_connected_clients);
1156                 }
1157
1158                 /* if called during recovery then should update
1159                  * obd_stale_clients counter,
1160                  * lightweight exports are not counted */
1161                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1162                         exp->exp_obd->obd_stale_clients++;
1163         }
1164         spin_unlock(&obd->obd_recovery_task_lock);
1165
1166         spin_lock(&exp->exp_lock);
1167         /** Cleanup req replay fields */
1168         if (exp->exp_req_replay_needed) {
1169                 exp->exp_req_replay_needed = 0;
1170
1171                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1172                 atomic_dec(&obd->obd_req_replay_clients);
1173         }
1174
1175         /** Cleanup lock replay data */
1176         if (exp->exp_lock_replay_needed) {
1177                 exp->exp_lock_replay_needed = 0;
1178
1179                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1180                 atomic_dec(&obd->obd_lock_replay_clients);
1181         }
1182         spin_unlock(&exp->exp_lock);
1183 }
1184
1185 /* This function removes 1-3 references from the export:
1186  * 1 - for export pointer passed
1187  * and if disconnect really need
1188  * 2 - removing from hash
1189  * 3 - in client_unlink_export
1190  * The export pointer passed to this function can destroyed */
1191 int class_disconnect(struct obd_export *export)
1192 {
1193         int already_disconnected;
1194         ENTRY;
1195
1196         if (export == NULL) {
1197                 CWARN("attempting to free NULL export %p\n", export);
1198                 RETURN(-EINVAL);
1199         }
1200
1201         spin_lock(&export->exp_lock);
1202         already_disconnected = export->exp_disconnected;
1203         export->exp_disconnected = 1;
1204         spin_unlock(&export->exp_lock);
1205
1206         /* class_cleanup(), abort_recovery(), and class_fail_export()
1207          * all end up in here, and if any of them race we shouldn't
1208          * call extra class_export_puts(). */
1209         if (already_disconnected) {
1210                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1211                 GOTO(no_disconn, already_disconnected);
1212         }
1213
1214         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1215                export->exp_handle.h_cookie);
1216
1217         if (!hlist_unhashed(&export->exp_nid_hash))
1218                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1219                              &export->exp_connection->c_peer.nid,
1220                              &export->exp_nid_hash);
1221
1222         class_export_recovery_cleanup(export);
1223         class_unlink_export(export);
1224 no_disconn:
1225         class_export_put(export);
1226         RETURN(0);
1227 }
1228 EXPORT_SYMBOL(class_disconnect);
1229
1230 /* Return non-zero for a fully connected export */
1231 int class_connected_export(struct obd_export *exp)
1232 {
1233         int connected = 0;
1234
1235         if (exp) {
1236                 spin_lock(&exp->exp_lock);
1237                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1238                 spin_unlock(&exp->exp_lock);
1239         }
1240         return connected;
1241 }
1242 EXPORT_SYMBOL(class_connected_export);
1243
1244 static void class_disconnect_export_list(struct list_head *list,
1245                                          enum obd_option flags)
1246 {
1247         int rc;
1248         struct obd_export *exp;
1249         ENTRY;
1250
1251         /* It's possible that an export may disconnect itself, but
1252          * nothing else will be added to this list. */
1253         while (!list_empty(list)) {
1254                 exp = list_entry(list->next, struct obd_export,
1255                                  exp_obd_chain);
1256                 /* need for safe call CDEBUG after obd_disconnect */
1257                 class_export_get(exp);
1258
1259                 spin_lock(&exp->exp_lock);
1260                 exp->exp_flags = flags;
1261                 spin_unlock(&exp->exp_lock);
1262
1263                 if (obd_uuid_equals(&exp->exp_client_uuid,
1264                                     &exp->exp_obd->obd_uuid)) {
1265                         CDEBUG(D_HA,
1266                                "exp %p export uuid == obd uuid, don't discon\n",
1267                                exp);
1268                         /* Need to delete this now so we don't end up pointing
1269                          * to work_list later when this export is cleaned up. */
1270                         list_del_init(&exp->exp_obd_chain);
1271                         class_export_put(exp);
1272                         continue;
1273                 }
1274
1275                 class_export_get(exp);
1276                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1277                        "last request at "CFS_TIME_T"\n",
1278                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1279                        exp, exp->exp_last_request_time);
1280                 /* release one export reference anyway */
1281                 rc = obd_disconnect(exp);
1282
1283                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1284                        obd_export_nid2str(exp), exp, rc);
1285                 class_export_put(exp);
1286         }
1287         EXIT;
1288 }
1289
1290 void class_disconnect_exports(struct obd_device *obd)
1291 {
1292         struct list_head work_list;
1293         ENTRY;
1294
1295         /* Move all of the exports from obd_exports to a work list, en masse. */
1296         INIT_LIST_HEAD(&work_list);
1297         spin_lock(&obd->obd_dev_lock);
1298         list_splice_init(&obd->obd_exports, &work_list);
1299         list_splice_init(&obd->obd_delayed_exports, &work_list);
1300         spin_unlock(&obd->obd_dev_lock);
1301
1302         if (!list_empty(&work_list)) {
1303                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1304                        "disconnecting them\n", obd->obd_minor, obd);
1305                 class_disconnect_export_list(&work_list,
1306                                              exp_flags_from_obd(obd));
1307         } else
1308                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1309                        obd->obd_minor, obd);
1310         EXIT;
1311 }
1312 EXPORT_SYMBOL(class_disconnect_exports);
1313
1314 /* Remove exports that have not completed recovery.
1315  */
1316 void class_disconnect_stale_exports(struct obd_device *obd,
1317                                     int (*test_export)(struct obd_export *))
1318 {
1319         struct list_head work_list;
1320         struct obd_export *exp, *n;
1321         int evicted = 0;
1322         ENTRY;
1323
1324         INIT_LIST_HEAD(&work_list);
1325         spin_lock(&obd->obd_dev_lock);
1326         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1327                                  exp_obd_chain) {
1328                 /* don't count self-export as client */
1329                 if (obd_uuid_equals(&exp->exp_client_uuid,
1330                                     &exp->exp_obd->obd_uuid))
1331                         continue;
1332
1333                 /* don't evict clients which have no slot in last_rcvd
1334                  * (e.g. lightweight connection) */
1335                 if (exp->exp_target_data.ted_lr_idx == -1)
1336                         continue;
1337
1338                 spin_lock(&exp->exp_lock);
1339                 if (exp->exp_failed || test_export(exp)) {
1340                         spin_unlock(&exp->exp_lock);
1341                         continue;
1342                 }
1343                 exp->exp_failed = 1;
1344                 spin_unlock(&exp->exp_lock);
1345
1346                 list_move(&exp->exp_obd_chain, &work_list);
1347                 evicted++;
1348                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1349                        obd->obd_name, exp->exp_client_uuid.uuid,
1350                        exp->exp_connection == NULL ? "<unknown>" :
1351                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1352                 print_export_data(exp, "EVICTING", 0);
1353         }
1354         spin_unlock(&obd->obd_dev_lock);
1355
1356         if (evicted)
1357                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1358                               obd->obd_name, evicted);
1359
1360         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1361                                                  OBD_OPT_ABORT_RECOV);
1362         EXIT;
1363 }
1364 EXPORT_SYMBOL(class_disconnect_stale_exports);
1365
1366 void class_fail_export(struct obd_export *exp)
1367 {
1368         int rc, already_failed;
1369
1370         spin_lock(&exp->exp_lock);
1371         already_failed = exp->exp_failed;
1372         exp->exp_failed = 1;
1373         spin_unlock(&exp->exp_lock);
1374
1375         if (already_failed) {
1376                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1377                        exp, exp->exp_client_uuid.uuid);
1378                 return;
1379         }
1380
1381         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1382                exp, exp->exp_client_uuid.uuid);
1383
1384         if (obd_dump_on_timeout)
1385                 libcfs_debug_dumplog();
1386
1387         /* need for safe call CDEBUG after obd_disconnect */
1388         class_export_get(exp);
1389
1390         /* Most callers into obd_disconnect are removing their own reference
1391          * (request, for example) in addition to the one from the hash table.
1392          * We don't have such a reference here, so make one. */
1393         class_export_get(exp);
1394         rc = obd_disconnect(exp);
1395         if (rc)
1396                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1397         else
1398                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1399                        exp, exp->exp_client_uuid.uuid);
1400         class_export_put(exp);
1401 }
1402 EXPORT_SYMBOL(class_fail_export);
1403
1404 char *obd_export_nid2str(struct obd_export *exp)
1405 {
1406         if (exp->exp_connection != NULL)
1407                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1408
1409         return "(no nid)";
1410 }
1411 EXPORT_SYMBOL(obd_export_nid2str);
1412
1413 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1414 {
1415         struct cfs_hash *nid_hash;
1416         struct obd_export *doomed_exp = NULL;
1417         int exports_evicted = 0;
1418
1419         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1420
1421         spin_lock(&obd->obd_dev_lock);
1422         /* umount has run already, so evict thread should leave
1423          * its task to umount thread now */
1424         if (obd->obd_stopping) {
1425                 spin_unlock(&obd->obd_dev_lock);
1426                 return exports_evicted;
1427         }
1428         nid_hash = obd->obd_nid_hash;
1429         cfs_hash_getref(nid_hash);
1430         spin_unlock(&obd->obd_dev_lock);
1431
1432         do {
1433                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1434                 if (doomed_exp == NULL)
1435                         break;
1436
1437                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1438                          "nid %s found, wanted nid %s, requested nid %s\n",
1439                          obd_export_nid2str(doomed_exp),
1440                          libcfs_nid2str(nid_key), nid);
1441                 LASSERTF(doomed_exp != obd->obd_self_export,
1442                          "self-export is hashed by NID?\n");
1443                 exports_evicted++;
1444                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1445                               "request\n", obd->obd_name,
1446                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1447                               obd_export_nid2str(doomed_exp));
1448                 class_fail_export(doomed_exp);
1449                 class_export_put(doomed_exp);
1450         } while (1);
1451
1452         cfs_hash_putref(nid_hash);
1453
1454         if (!exports_evicted)
1455                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1456                        obd->obd_name, nid);
1457         return exports_evicted;
1458 }
1459 EXPORT_SYMBOL(obd_export_evict_by_nid);
1460
1461 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1462 {
1463         struct cfs_hash *uuid_hash;
1464         struct obd_export *doomed_exp = NULL;
1465         struct obd_uuid doomed_uuid;
1466         int exports_evicted = 0;
1467
1468         spin_lock(&obd->obd_dev_lock);
1469         if (obd->obd_stopping) {
1470                 spin_unlock(&obd->obd_dev_lock);
1471                 return exports_evicted;
1472         }
1473         uuid_hash = obd->obd_uuid_hash;
1474         cfs_hash_getref(uuid_hash);
1475         spin_unlock(&obd->obd_dev_lock);
1476
1477         obd_str2uuid(&doomed_uuid, uuid);
1478         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1479                 CERROR("%s: can't evict myself\n", obd->obd_name);
1480                 cfs_hash_putref(uuid_hash);
1481                 return exports_evicted;
1482         }
1483
1484         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1485
1486         if (doomed_exp == NULL) {
1487                 CERROR("%s: can't disconnect %s: no exports found\n",
1488                        obd->obd_name, uuid);
1489         } else {
1490                 CWARN("%s: evicting %s at adminstrative request\n",
1491                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1492                 class_fail_export(doomed_exp);
1493                 class_export_put(doomed_exp);
1494                 exports_evicted++;
1495         }
1496         cfs_hash_putref(uuid_hash);
1497
1498         return exports_evicted;
1499 }
1500
1501 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1502 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1503 #endif
1504
1505 static void print_export_data(struct obd_export *exp, const char *status,
1506                               int locks)
1507 {
1508         struct ptlrpc_reply_state *rs;
1509         struct ptlrpc_reply_state *first_reply = NULL;
1510         int nreplies = 0;
1511
1512         spin_lock(&exp->exp_lock);
1513         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1514                             rs_exp_list) {
1515                 if (nreplies == 0)
1516                         first_reply = rs;
1517                 nreplies++;
1518         }
1519         spin_unlock(&exp->exp_lock);
1520
1521         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1522                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1523                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1524                atomic_read(&exp->exp_rpc_count),
1525                atomic_read(&exp->exp_cb_count),
1526                atomic_read(&exp->exp_locks_count),
1527                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1528                nreplies, first_reply, nreplies > 3 ? "..." : "",
1529                exp->exp_last_committed);
1530 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1531         if (locks && class_export_dump_hook != NULL)
1532                 class_export_dump_hook(exp);
1533 #endif
1534 }
1535
1536 void dump_exports(struct obd_device *obd, int locks)
1537 {
1538         struct obd_export *exp;
1539
1540         spin_lock(&obd->obd_dev_lock);
1541         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1542                 print_export_data(exp, "ACTIVE", locks);
1543         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1544                 print_export_data(exp, "UNLINKED", locks);
1545         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1546                 print_export_data(exp, "DELAYED", locks);
1547         spin_unlock(&obd->obd_dev_lock);
1548         spin_lock(&obd_zombie_impexp_lock);
1549         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1550                 print_export_data(exp, "ZOMBIE", locks);
1551         spin_unlock(&obd_zombie_impexp_lock);
1552 }
1553
1554 void obd_exports_barrier(struct obd_device *obd)
1555 {
1556         int waited = 2;
1557         LASSERT(list_empty(&obd->obd_exports));
1558         spin_lock(&obd->obd_dev_lock);
1559         while (!list_empty(&obd->obd_unlinked_exports)) {
1560                 spin_unlock(&obd->obd_dev_lock);
1561                 set_current_state(TASK_UNINTERRUPTIBLE);
1562                 schedule_timeout(cfs_time_seconds(waited));
1563                 if (waited > 5 && IS_PO2(waited)) {
1564                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1565                                       "more than %d seconds. "
1566                                       "The obd refcount = %d. Is it stuck?\n",
1567                                       obd->obd_name, waited,
1568                                       atomic_read(&obd->obd_refcount));
1569                         dump_exports(obd, 1);
1570                 }
1571                 waited *= 2;
1572                 spin_lock(&obd->obd_dev_lock);
1573         }
1574         spin_unlock(&obd->obd_dev_lock);
1575 }
1576 EXPORT_SYMBOL(obd_exports_barrier);
1577
1578 /* Total amount of zombies to be destroyed */
1579 static int zombies_count = 0;
1580
1581 /**
1582  * kill zombie imports and exports
1583  */
1584 void obd_zombie_impexp_cull(void)
1585 {
1586         struct obd_import *import;
1587         struct obd_export *export;
1588         ENTRY;
1589
1590         do {
1591                 spin_lock(&obd_zombie_impexp_lock);
1592
1593                 import = NULL;
1594                 if (!list_empty(&obd_zombie_imports)) {
1595                         import = list_entry(obd_zombie_imports.next,
1596                                             struct obd_import,
1597                                             imp_zombie_chain);
1598                         list_del_init(&import->imp_zombie_chain);
1599                 }
1600
1601                 export = NULL;
1602                 if (!list_empty(&obd_zombie_exports)) {
1603                         export = list_entry(obd_zombie_exports.next,
1604                                             struct obd_export,
1605                                             exp_obd_chain);
1606                         list_del_init(&export->exp_obd_chain);
1607                 }
1608
1609                 spin_unlock(&obd_zombie_impexp_lock);
1610
1611                 if (import != NULL) {
1612                         class_import_destroy(import);
1613                         spin_lock(&obd_zombie_impexp_lock);
1614                         zombies_count--;
1615                         spin_unlock(&obd_zombie_impexp_lock);
1616                 }
1617
1618                 if (export != NULL) {
1619                         class_export_destroy(export);
1620                         spin_lock(&obd_zombie_impexp_lock);
1621                         zombies_count--;
1622                         spin_unlock(&obd_zombie_impexp_lock);
1623                 }
1624
1625                 cond_resched();
1626         } while (import != NULL || export != NULL);
1627         EXIT;
1628 }
1629
1630 static struct completion        obd_zombie_start;
1631 static struct completion        obd_zombie_stop;
1632 static unsigned long            obd_zombie_flags;
1633 static wait_queue_head_t        obd_zombie_waitq;
1634 static pid_t                    obd_zombie_pid;
1635
1636 enum {
1637         OBD_ZOMBIE_STOP         = 0x0001,
1638 };
1639
1640 /**
1641  * check for work for kill zombie import/export thread.
1642  */
1643 static int obd_zombie_impexp_check(void *arg)
1644 {
1645         int rc;
1646
1647         spin_lock(&obd_zombie_impexp_lock);
1648         rc = (zombies_count == 0) &&
1649              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1650         spin_unlock(&obd_zombie_impexp_lock);
1651
1652         RETURN(rc);
1653 }
1654
1655 /**
1656  * Add export to the obd_zombe thread and notify it.
1657  */
1658 static void obd_zombie_export_add(struct obd_export *exp) {
1659         spin_lock(&exp->exp_obd->obd_dev_lock);
1660         LASSERT(!list_empty(&exp->exp_obd_chain));
1661         list_del_init(&exp->exp_obd_chain);
1662         spin_unlock(&exp->exp_obd->obd_dev_lock);
1663         spin_lock(&obd_zombie_impexp_lock);
1664         zombies_count++;
1665         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1666         spin_unlock(&obd_zombie_impexp_lock);
1667
1668         obd_zombie_impexp_notify();
1669 }
1670
1671 /**
1672  * Add import to the obd_zombe thread and notify it.
1673  */
1674 static void obd_zombie_import_add(struct obd_import *imp) {
1675         LASSERT(imp->imp_sec == NULL);
1676         spin_lock(&obd_zombie_impexp_lock);
1677         LASSERT(list_empty(&imp->imp_zombie_chain));
1678         zombies_count++;
1679         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1680         spin_unlock(&obd_zombie_impexp_lock);
1681
1682         obd_zombie_impexp_notify();
1683 }
1684
1685 /**
1686  * notify import/export destroy thread about new zombie.
1687  */
1688 static void obd_zombie_impexp_notify(void)
1689 {
1690         /*
1691          * Make sure obd_zomebie_impexp_thread get this notification.
1692          * It is possible this signal only get by obd_zombie_barrier, and
1693          * barrier gulps this notification and sleeps away and hangs ensues
1694          */
1695         wake_up_all(&obd_zombie_waitq);
1696 }
1697
1698 /**
1699  * check whether obd_zombie is idle
1700  */
1701 static int obd_zombie_is_idle(void)
1702 {
1703         int rc;
1704
1705         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1706         spin_lock(&obd_zombie_impexp_lock);
1707         rc = (zombies_count == 0);
1708         spin_unlock(&obd_zombie_impexp_lock);
1709         return rc;
1710 }
1711
1712 /**
1713  * wait when obd_zombie import/export queues become empty
1714  */
1715 void obd_zombie_barrier(void)
1716 {
1717         struct l_wait_info lwi = { 0 };
1718
1719         if (obd_zombie_pid == current_pid())
1720                 /* don't wait for myself */
1721                 return;
1722         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1723 }
1724 EXPORT_SYMBOL(obd_zombie_barrier);
1725
1726
1727 /**
1728  * destroy zombie export/import thread.
1729  */
1730 static int obd_zombie_impexp_thread(void *unused)
1731 {
1732         unshare_fs_struct();
1733         complete(&obd_zombie_start);
1734
1735         obd_zombie_pid = current_pid();
1736
1737         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1738                 struct l_wait_info lwi = { 0 };
1739
1740                 l_wait_event(obd_zombie_waitq,
1741                              !obd_zombie_impexp_check(NULL), &lwi);
1742                 obd_zombie_impexp_cull();
1743
1744                 /*
1745                  * Notify obd_zombie_barrier callers that queues
1746                  * may be empty.
1747                  */
1748                 wake_up(&obd_zombie_waitq);
1749         }
1750
1751         complete(&obd_zombie_stop);
1752
1753         RETURN(0);
1754 }
1755
1756
1757 /**
1758  * start destroy zombie import/export thread
1759  */
1760 int obd_zombie_impexp_init(void)
1761 {
1762         struct task_struct *task;
1763
1764         INIT_LIST_HEAD(&obd_zombie_imports);
1765
1766         INIT_LIST_HEAD(&obd_zombie_exports);
1767         spin_lock_init(&obd_zombie_impexp_lock);
1768         init_completion(&obd_zombie_start);
1769         init_completion(&obd_zombie_stop);
1770         init_waitqueue_head(&obd_zombie_waitq);
1771         obd_zombie_pid = 0;
1772
1773         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1774         if (IS_ERR(task))
1775                 RETURN(PTR_ERR(task));
1776
1777         wait_for_completion(&obd_zombie_start);
1778         RETURN(0);
1779 }
1780 /**
1781  * stop destroy zombie import/export thread
1782  */
1783 void obd_zombie_impexp_stop(void)
1784 {
1785         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1786         obd_zombie_impexp_notify();
1787         wait_for_completion(&obd_zombie_stop);
1788 }
1789
1790 /***** Kernel-userspace comm helpers *******/
1791
1792 /* Get length of entire message, including header */
1793 int kuc_len(int payload_len)
1794 {
1795         return sizeof(struct kuc_hdr) + payload_len;
1796 }
1797 EXPORT_SYMBOL(kuc_len);
1798
1799 /* Get a pointer to kuc header, given a ptr to the payload
1800  * @param p Pointer to payload area
1801  * @returns Pointer to kuc header
1802  */
1803 struct kuc_hdr * kuc_ptr(void *p)
1804 {
1805         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1806         LASSERT(lh->kuc_magic == KUC_MAGIC);
1807         return lh;
1808 }
1809 EXPORT_SYMBOL(kuc_ptr);
1810
1811 /* Test if payload is part of kuc message
1812  * @param p Pointer to payload area
1813  * @returns boolean
1814  */
1815 int kuc_ispayload(void *p)
1816 {
1817         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1818
1819         if (kh->kuc_magic == KUC_MAGIC)
1820                 return 1;
1821         else
1822                 return 0;
1823 }
1824 EXPORT_SYMBOL(kuc_ispayload);
1825
1826 /* Alloc space for a message, and fill in header
1827  * @return Pointer to payload area
1828  */
1829 void *kuc_alloc(int payload_len, int transport, int type)
1830 {
1831         struct kuc_hdr *lh;
1832         int len = kuc_len(payload_len);
1833
1834         OBD_ALLOC(lh, len);
1835         if (lh == NULL)
1836                 return ERR_PTR(-ENOMEM);
1837
1838         lh->kuc_magic = KUC_MAGIC;
1839         lh->kuc_transport = transport;
1840         lh->kuc_msgtype = type;
1841         lh->kuc_msglen = len;
1842
1843         return (void *)(lh + 1);
1844 }
1845 EXPORT_SYMBOL(kuc_alloc);
1846
1847 /* Takes pointer to payload area */
1848 inline void kuc_free(void *p, int payload_len)
1849 {
1850         struct kuc_hdr *lh = kuc_ptr(p);
1851         OBD_FREE(lh, kuc_len(payload_len));
1852 }
1853 EXPORT_SYMBOL(kuc_free);
1854
1855 struct obd_request_slot_waiter {
1856         struct list_head        orsw_entry;
1857         wait_queue_head_t       orsw_waitq;
1858         bool                    orsw_signaled;
1859 };
1860
1861 static bool obd_request_slot_avail(struct client_obd *cli,
1862                                    struct obd_request_slot_waiter *orsw)
1863 {
1864         bool avail;
1865
1866         spin_lock(&cli->cl_loi_list_lock);
1867         avail = !!list_empty(&orsw->orsw_entry);
1868         spin_unlock(&cli->cl_loi_list_lock);
1869
1870         return avail;
1871 };
1872
1873 /*
1874  * For network flow control, the RPC sponsor needs to acquire a credit
1875  * before sending the RPC. The credits count for a connection is defined
1876  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1877  * the subsequent RPC sponsors need to wait until others released their
1878  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1879  */
1880 int obd_get_request_slot(struct client_obd *cli)
1881 {
1882         struct obd_request_slot_waiter   orsw;
1883         struct l_wait_info               lwi;
1884         int                              rc;
1885
1886         spin_lock(&cli->cl_loi_list_lock);
1887         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1888                 cli->cl_r_in_flight++;
1889                 spin_unlock(&cli->cl_loi_list_lock);
1890                 return 0;
1891         }
1892
1893         init_waitqueue_head(&orsw.orsw_waitq);
1894         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1895         orsw.orsw_signaled = false;
1896         spin_unlock(&cli->cl_loi_list_lock);
1897
1898         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1899         rc = l_wait_event(orsw.orsw_waitq,
1900                           obd_request_slot_avail(cli, &orsw) ||
1901                           orsw.orsw_signaled,
1902                           &lwi);
1903
1904         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1905          * freed but other (such as obd_put_request_slot) is using it. */
1906         spin_lock(&cli->cl_loi_list_lock);
1907         if (rc != 0) {
1908                 if (!orsw.orsw_signaled) {
1909                         if (list_empty(&orsw.orsw_entry))
1910                                 cli->cl_r_in_flight--;
1911                         else
1912                                 list_del(&orsw.orsw_entry);
1913                 }
1914         }
1915
1916         if (orsw.orsw_signaled) {
1917                 LASSERT(list_empty(&orsw.orsw_entry));
1918
1919                 rc = -EINTR;
1920         }
1921         spin_unlock(&cli->cl_loi_list_lock);
1922
1923         return rc;
1924 }
1925 EXPORT_SYMBOL(obd_get_request_slot);
1926
1927 void obd_put_request_slot(struct client_obd *cli)
1928 {
1929         struct obd_request_slot_waiter *orsw;
1930
1931         spin_lock(&cli->cl_loi_list_lock);
1932         cli->cl_r_in_flight--;
1933
1934         /* If there is free slot, wakeup the first waiter. */
1935         if (!list_empty(&cli->cl_loi_read_list) &&
1936             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1937                 orsw = list_entry(cli->cl_loi_read_list.next,
1938                                   struct obd_request_slot_waiter, orsw_entry);
1939                 list_del_init(&orsw->orsw_entry);
1940                 cli->cl_r_in_flight++;
1941                 wake_up(&orsw->orsw_waitq);
1942         }
1943         spin_unlock(&cli->cl_loi_list_lock);
1944 }
1945 EXPORT_SYMBOL(obd_put_request_slot);
1946
1947 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1948 {
1949         return cli->cl_max_rpcs_in_flight;
1950 }
1951 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1952
1953 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1954 {
1955         struct obd_request_slot_waiter *orsw;
1956         __u32                           old;
1957         int                             diff;
1958         int                             i;
1959         char                            *typ_name;
1960         int                             rc;
1961
1962         if (max > OBD_MAX_RIF_MAX || max < 1)
1963                 return -ERANGE;
1964
1965         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
1966         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
1967                 /* adjust max_mod_rpcs_in_flight to ensure it is always
1968                  * strictly lower that max_rpcs_in_flight */
1969                 if (max < 2) {
1970                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
1971                                "because it must be higher than "
1972                                "max_mod_rpcs_in_flight value",
1973                                cli->cl_import->imp_obd->obd_name);
1974                         return -ERANGE;
1975                 }
1976                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
1977                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
1978                         if (rc != 0)
1979                                 return rc;
1980                 }
1981         }
1982
1983         spin_lock(&cli->cl_loi_list_lock);
1984         old = cli->cl_max_rpcs_in_flight;
1985         cli->cl_max_rpcs_in_flight = max;
1986         diff = max - old;
1987
1988         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1989         for (i = 0; i < diff; i++) {
1990                 if (list_empty(&cli->cl_loi_read_list))
1991                         break;
1992
1993                 orsw = list_entry(cli->cl_loi_read_list.next,
1994                                   struct obd_request_slot_waiter, orsw_entry);
1995                 list_del_init(&orsw->orsw_entry);
1996                 cli->cl_r_in_flight++;
1997                 wake_up(&orsw->orsw_waitq);
1998         }
1999         spin_unlock(&cli->cl_loi_list_lock);
2000
2001         return 0;
2002 }
2003 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2004
2005 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2006 {
2007         return cli->cl_max_mod_rpcs_in_flight;
2008 }
2009 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2010
2011 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2012 {
2013         struct obd_connect_data *ocd;
2014         __u16 maxmodrpcs;
2015         __u16 prev;
2016
2017         if (max > OBD_MAX_RIF_MAX || max < 1)
2018                 return -ERANGE;
2019
2020         /* cannot exceed or equal max_rpcs_in_flight */
2021         if (max >= cli->cl_max_rpcs_in_flight) {
2022                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2023                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2024                        cli->cl_import->imp_obd->obd_name,
2025                        max, cli->cl_max_rpcs_in_flight);
2026                 return -ERANGE;
2027         }
2028
2029         /* cannot exceed max modify RPCs in flight supported by the server */
2030         ocd = &cli->cl_import->imp_connect_data;
2031         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2032                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2033         else
2034                 maxmodrpcs = 1;
2035         if (max > maxmodrpcs) {
2036                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2037                        "higher than max_mod_rpcs_per_client value (%hu) "
2038                        "returned by the server at connection\n",
2039                        cli->cl_import->imp_obd->obd_name,
2040                        max, maxmodrpcs);
2041                 return -ERANGE;
2042         }
2043
2044         spin_lock(&cli->cl_mod_rpcs_lock);
2045
2046         prev = cli->cl_max_mod_rpcs_in_flight;
2047         cli->cl_max_mod_rpcs_in_flight = max;
2048
2049         /* wakeup waiters if limit has been increased */
2050         if (cli->cl_max_mod_rpcs_in_flight > prev)
2051                 wake_up(&cli->cl_mod_rpcs_waitq);
2052
2053         spin_unlock(&cli->cl_mod_rpcs_lock);
2054
2055         return 0;
2056 }
2057 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2058
2059
2060 #define pct(a, b) (b ? a * 100 / b : 0)
2061 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2062                                struct seq_file *seq)
2063 {
2064         struct timeval now;
2065         unsigned long mod_tot = 0, mod_cum;
2066         int i;
2067
2068         do_gettimeofday(&now);
2069
2070         spin_lock(&cli->cl_mod_rpcs_lock);
2071
2072         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2073                    now.tv_sec, now.tv_usec);
2074         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2075                    cli->cl_mod_rpcs_in_flight);
2076
2077         seq_printf(seq, "\n\t\t\tmodify\n");
2078         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2079
2080         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2081
2082         mod_cum = 0;
2083         for (i = 0; i < OBD_HIST_MAX; i++) {
2084                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2085                 mod_cum += mod;
2086                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2087                                  i, mod, pct(mod, mod_tot),
2088                                  pct(mod_cum, mod_tot));
2089                 if (mod_cum == mod_tot)
2090                         break;
2091         }
2092
2093         spin_unlock(&cli->cl_mod_rpcs_lock);
2094
2095         return 0;
2096 }
2097 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2098 #undef pct
2099
2100
2101 /* The number of modify RPCs sent in parallel is limited
2102  * because the server has a finite number of slots per client to
2103  * store request result and ensure reply reconstruction when needed.
2104  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2105  * that takes into account server limit and cl_max_rpcs_in_flight
2106  * value.
2107  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2108  * one close request is allowed above the maximum.
2109  */
2110 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2111                                                  bool close_req)
2112 {
2113         bool avail;
2114
2115         /* A slot is available if
2116          * - number of modify RPCs in flight is less than the max
2117          * - it's a close RPC and no other close request is in flight
2118          */
2119         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2120                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2121
2122         return avail;
2123 }
2124
2125 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2126                                          bool close_req)
2127 {
2128         bool avail;
2129
2130         spin_lock(&cli->cl_mod_rpcs_lock);
2131         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2132         spin_unlock(&cli->cl_mod_rpcs_lock);
2133         return avail;
2134 }
2135
2136 /* Get a modify RPC slot from the obd client @cli according
2137  * to the kind of operation @opc that is going to be sent
2138  * and the intent @it of the operation if it applies.
2139  * If the maximum number of modify RPCs in flight is reached
2140  * the thread is put to sleep.
2141  * Returns the tag to be set in the request message. Tag 0
2142  * is reserved for non-modifying requests.
2143  */
2144 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2145                            struct lookup_intent *it)
2146 {
2147         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2148         bool                    close_req = false;
2149         __u16                   i, max;
2150
2151         /* read-only metadata RPCs don't consume a slot on MDT
2152          * for reply reconstruction
2153          */
2154         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2155                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2156                 return 0;
2157
2158         if (opc == MDS_CLOSE)
2159                 close_req = true;
2160
2161         do {
2162                 spin_lock(&cli->cl_mod_rpcs_lock);
2163                 max = cli->cl_max_mod_rpcs_in_flight;
2164                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2165                         /* there is a slot available */
2166                         cli->cl_mod_rpcs_in_flight++;
2167                         if (close_req)
2168                                 cli->cl_close_rpcs_in_flight++;
2169                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2170                                          cli->cl_mod_rpcs_in_flight);
2171                         /* find a free tag */
2172                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2173                                                 max + 1);
2174                         LASSERT(i < OBD_MAX_RIF_MAX);
2175                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2176                         spin_unlock(&cli->cl_mod_rpcs_lock);
2177                         /* tag 0 is reserved for non-modify RPCs */
2178                         return i + 1;
2179                 }
2180                 spin_unlock(&cli->cl_mod_rpcs_lock);
2181
2182                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2183                        "opc %u, max %hu\n",
2184                        cli->cl_import->imp_obd->obd_name, opc, max);
2185
2186                 l_wait_event(cli->cl_mod_rpcs_waitq,
2187                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2188         } while (true);
2189 }
2190 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2191
2192 /* Put a modify RPC slot from the obd client @cli according
2193  * to the kind of operation @opc that has been sent and the
2194  * intent @it of the operation if it applies.
2195  */
2196 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2197                           struct lookup_intent *it, __u16 tag)
2198 {
2199         bool                    close_req = false;
2200
2201         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2202                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2203                 return;
2204
2205         if (opc == MDS_CLOSE)
2206                 close_req = true;
2207
2208         spin_lock(&cli->cl_mod_rpcs_lock);
2209         cli->cl_mod_rpcs_in_flight--;
2210         if (close_req)
2211                 cli->cl_close_rpcs_in_flight--;
2212         /* release the tag in the bitmap */
2213         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2214         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2215         spin_unlock(&cli->cl_mod_rpcs_lock);
2216         wake_up(&cli->cl_mod_rpcs_waitq);
2217 }
2218 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2219