Whamcloud - gitweb
LU-5319 mdt: support multiple modify RCPs in parallel
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
48
49 spinlock_t obd_types_lock;
50
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
55
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t  obd_zombie_impexp_lock;
59
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114 EXPORT_SYMBOL(class_search_type);
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123
124                 if (strcmp(modname, "obdfilter") == 0)
125                         modname = "ofd";
126
127                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128                         modname = LUSTRE_OSP_NAME;
129
130                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131                         modname = LUSTRE_MDT_NAME;
132
133                 if (!request_module("%s", modname)) {
134                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135                         type = class_search_type(name);
136                 } else {
137                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138                                            modname);
139                 }
140         }
141 #endif
142         if (type) {
143                 spin_lock(&type->obd_type_lock);
144                 type->typ_refcnt++;
145                 try_module_get(type->typ_dt_ops->o_owner);
146                 spin_unlock(&type->obd_type_lock);
147         }
148         return type;
149 }
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         spin_lock(&type->obd_type_lock);
155         type->typ_refcnt--;
156         module_put(type->typ_dt_ops->o_owner);
157         spin_unlock(&type->obd_type_lock);
158 }
159
160 #define CLASS_MAX_NAME 1024
161
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163                         bool enable_proc, struct lprocfs_vars *vars,
164                         const char *name, struct lu_device_type *ldt)
165 {
166         struct obd_type *type;
167         int rc = 0;
168         ENTRY;
169
170         /* sanity check */
171         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172
173         if (class_search_type(name)) {
174                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175                 RETURN(-EEXIST);
176         }
177
178         rc = -ENOMEM;
179         OBD_ALLOC(type, sizeof(*type));
180         if (type == NULL)
181                 RETURN(rc);
182
183         OBD_ALLOC_PTR(type->typ_dt_ops);
184         OBD_ALLOC_PTR(type->typ_md_ops);
185         OBD_ALLOC(type->typ_name, strlen(name) + 1);
186
187         if (type->typ_dt_ops == NULL ||
188             type->typ_md_ops == NULL ||
189             type->typ_name == NULL)
190                 GOTO (failed, rc);
191
192         *(type->typ_dt_ops) = *dt_ops;
193         /* md_ops is optional */
194         if (md_ops)
195                 *(type->typ_md_ops) = *md_ops;
196         strcpy(type->typ_name, name);
197         spin_lock_init(&type->obd_type_lock);
198
199 #ifdef CONFIG_PROC_FS
200         if (enable_proc) {
201                 type->typ_procroot = lprocfs_register(type->typ_name,
202                                                       proc_lustre_root,
203                                                       vars, type);
204                 if (IS_ERR(type->typ_procroot)) {
205                         rc = PTR_ERR(type->typ_procroot);
206                         type->typ_procroot = NULL;
207                         GOTO(failed, rc);
208                 }
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224 failed:
225         if (type->typ_name != NULL) {
226 #ifdef CONFIG_PROC_FS
227                 if (type->typ_procroot != NULL)
228                         remove_proc_subtree(type->typ_name, proc_lustre_root);
229 #endif
230                 OBD_FREE(type->typ_name, strlen(name) + 1);
231         }
232         if (type->typ_md_ops != NULL)
233                 OBD_FREE_PTR(type->typ_md_ops);
234         if (type->typ_dt_ops != NULL)
235                 OBD_FREE_PTR(type->typ_dt_ops);
236         OBD_FREE(type, sizeof(*type));
237         RETURN(rc);
238 }
239 EXPORT_SYMBOL(class_register_type);
240
241 int class_unregister_type(const char *name)
242 {
243         struct obd_type *type = class_search_type(name);
244         ENTRY;
245
246         if (!type) {
247                 CERROR("unknown obd type\n");
248                 RETURN(-EINVAL);
249         }
250
251         if (type->typ_refcnt) {
252                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
253                 /* This is a bad situation, let's make the best of it */
254                 /* Remove ops, but leave the name for debugging */
255                 OBD_FREE_PTR(type->typ_dt_ops);
256                 OBD_FREE_PTR(type->typ_md_ops);
257                 RETURN(-EBUSY);
258         }
259
260         /* we do not use type->typ_procroot as for compatibility purposes
261          * other modules can share names (i.e. lod can use lov entry). so
262          * we can't reference pointer as it can get invalided when another
263          * module removes the entry */
264 #ifdef CONFIG_PROC_FS
265         if (type->typ_procroot != NULL)
266                 remove_proc_subtree(type->typ_name, proc_lustre_root);
267         if (type->typ_procsym != NULL)
268                 lprocfs_remove(&type->typ_procsym);
269 #endif
270         if (type->typ_lu)
271                 lu_device_type_fini(type->typ_lu);
272
273         spin_lock(&obd_types_lock);
274         list_del(&type->typ_chain);
275         spin_unlock(&obd_types_lock);
276         OBD_FREE(type->typ_name, strlen(name) + 1);
277         if (type->typ_dt_ops != NULL)
278                 OBD_FREE_PTR(type->typ_dt_ops);
279         if (type->typ_md_ops != NULL)
280                 OBD_FREE_PTR(type->typ_md_ops);
281         OBD_FREE(type, sizeof(*type));
282         RETURN(0);
283 } /* class_unregister_type */
284 EXPORT_SYMBOL(class_unregister_type);
285
286 /**
287  * Create a new obd device.
288  *
289  * Find an empty slot in ::obd_devs[], create a new obd device in it.
290  *
291  * \param[in] type_name obd device type string.
292  * \param[in] name      obd device name.
293  *
294  * \retval NULL if create fails, otherwise return the obd device
295  *         pointer created.
296  */
297 struct obd_device *class_newdev(const char *type_name, const char *name)
298 {
299         struct obd_device *result = NULL;
300         struct obd_device *newdev;
301         struct obd_type *type = NULL;
302         int i;
303         int new_obd_minor = 0;
304         ENTRY;
305
306         if (strlen(name) >= MAX_OBD_NAME) {
307                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
308                 RETURN(ERR_PTR(-EINVAL));
309         }
310
311         type = class_get_type(type_name);
312         if (type == NULL){
313                 CERROR("OBD: unknown type: %s\n", type_name);
314                 RETURN(ERR_PTR(-ENODEV));
315         }
316
317         newdev = obd_device_alloc();
318         if (newdev == NULL)
319                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
320
321         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
322
323         write_lock(&obd_dev_lock);
324         for (i = 0; i < class_devno_max(); i++) {
325                 struct obd_device *obd = class_num2obd(i);
326
327                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
328                         CERROR("Device %s already exists at %d, won't add\n",
329                                name, i);
330                         if (result) {
331                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
332                                          "%p obd_magic %08x != %08x\n", result,
333                                          result->obd_magic, OBD_DEVICE_MAGIC);
334                                 LASSERTF(result->obd_minor == new_obd_minor,
335                                          "%p obd_minor %d != %d\n", result,
336                                          result->obd_minor, new_obd_minor);
337
338                                 obd_devs[result->obd_minor] = NULL;
339                                 result->obd_name[0]='\0';
340                          }
341                         result = ERR_PTR(-EEXIST);
342                         break;
343                 }
344                 if (!result && !obd) {
345                         result = newdev;
346                         result->obd_minor = i;
347                         new_obd_minor = i;
348                         result->obd_type = type;
349                         strncpy(result->obd_name, name,
350                                 sizeof(result->obd_name) - 1);
351                         obd_devs[i] = result;
352                 }
353         }
354         write_unlock(&obd_dev_lock);
355
356         if (result == NULL && i >= class_devno_max()) {
357                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
358                        class_devno_max());
359                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
360         }
361
362         if (IS_ERR(result))
363                 GOTO(out, result);
364
365         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
366                result->obd_name, result);
367
368         RETURN(result);
369 out:
370         obd_device_free(newdev);
371 out_type:
372         class_put_type(type);
373         return result;
374 }
375
376 void class_release_dev(struct obd_device *obd)
377 {
378         struct obd_type *obd_type = obd->obd_type;
379
380         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
381                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
382         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
383                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
384         LASSERT(obd_type != NULL);
385
386         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
387                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
388
389         write_lock(&obd_dev_lock);
390         obd_devs[obd->obd_minor] = NULL;
391         write_unlock(&obd_dev_lock);
392         obd_device_free(obd);
393
394         class_put_type(obd_type);
395 }
396
397 int class_name2dev(const char *name)
398 {
399         int i;
400
401         if (!name)
402                 return -1;
403
404         read_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407
408                 if (obd && strcmp(name, obd->obd_name) == 0) {
409                         /* Make sure we finished attaching before we give
410                            out any references */
411                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412                         if (obd->obd_attached) {
413                                 read_unlock(&obd_dev_lock);
414                                 return i;
415                         }
416                         break;
417                 }
418         }
419         read_unlock(&obd_dev_lock);
420
421         return -1;
422 }
423
424 struct obd_device *class_name2obd(const char *name)
425 {
426         int dev = class_name2dev(name);
427
428         if (dev < 0 || dev > class_devno_max())
429                 return NULL;
430         return class_num2obd(dev);
431 }
432 EXPORT_SYMBOL(class_name2obd);
433
434 int class_uuid2dev(struct obd_uuid *uuid)
435 {
436         int i;
437
438         read_lock(&obd_dev_lock);
439         for (i = 0; i < class_devno_max(); i++) {
440                 struct obd_device *obd = class_num2obd(i);
441
442                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444                         read_unlock(&obd_dev_lock);
445                         return i;
446                 }
447         }
448         read_unlock(&obd_dev_lock);
449
450         return -1;
451 }
452
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
454 {
455         int dev = class_uuid2dev(uuid);
456         if (dev < 0)
457                 return NULL;
458         return class_num2obd(dev);
459 }
460 EXPORT_SYMBOL(class_uuid2obd);
461
462 /**
463  * Get obd device from ::obd_devs[]
464  *
465  * \param num [in] array index
466  *
467  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468  *         otherwise return the obd device there.
469  */
470 struct obd_device *class_num2obd(int num)
471 {
472         struct obd_device *obd = NULL;
473
474         if (num < class_devno_max()) {
475                 obd = obd_devs[num];
476                 if (obd == NULL)
477                         return NULL;
478
479                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480                          "%p obd_magic %08x != %08x\n",
481                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482                 LASSERTF(obd->obd_minor == num,
483                          "%p obd_minor %0d != %0d\n",
484                          obd, obd->obd_minor, num);
485         }
486
487         return obd;
488 }
489
490 /**
491  * Get obd devices count. Device in any
492  *    state are counted
493  * \retval obd device count
494  */
495 int get_devices_count(void)
496 {
497         int index, max_index = class_devno_max(), dev_count = 0;
498
499         read_lock(&obd_dev_lock);
500         for (index = 0; index <= max_index; index++) {
501                 struct obd_device *obd = class_num2obd(index);
502                 if (obd != NULL)
503                         dev_count++;
504         }
505         read_unlock(&obd_dev_lock);
506
507         return dev_count;
508 }
509 EXPORT_SYMBOL(get_devices_count);
510
511 void class_obd_list(void)
512 {
513         char *status;
514         int i;
515
516         read_lock(&obd_dev_lock);
517         for (i = 0; i < class_devno_max(); i++) {
518                 struct obd_device *obd = class_num2obd(i);
519
520                 if (obd == NULL)
521                         continue;
522                 if (obd->obd_stopping)
523                         status = "ST";
524                 else if (obd->obd_set_up)
525                         status = "UP";
526                 else if (obd->obd_attached)
527                         status = "AT";
528                 else
529                         status = "--";
530                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
531                          i, status, obd->obd_type->typ_name,
532                          obd->obd_name, obd->obd_uuid.uuid,
533                          atomic_read(&obd->obd_refcount));
534         }
535         read_unlock(&obd_dev_lock);
536         return;
537 }
538
539 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
540    specified, then only the client with that uuid is returned,
541    otherwise any client connected to the tgt is returned. */
542 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
543                                           const char * typ_name,
544                                           struct obd_uuid *grp_uuid)
545 {
546         int i;
547
548         read_lock(&obd_dev_lock);
549         for (i = 0; i < class_devno_max(); i++) {
550                 struct obd_device *obd = class_num2obd(i);
551
552                 if (obd == NULL)
553                         continue;
554                 if ((strncmp(obd->obd_type->typ_name, typ_name,
555                              strlen(typ_name)) == 0)) {
556                         if (obd_uuid_equals(tgt_uuid,
557                                             &obd->u.cli.cl_target_uuid) &&
558                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
559                                                          &obd->obd_uuid) : 1)) {
560                                 read_unlock(&obd_dev_lock);
561                                 return obd;
562                         }
563                 }
564         }
565         read_unlock(&obd_dev_lock);
566
567         return NULL;
568 }
569 EXPORT_SYMBOL(class_find_client_obd);
570
571 /* Iterate the obd_device list looking devices have grp_uuid. Start
572    searching at *next, and if a device is found, the next index to look
573    at is saved in *next. If next is NULL, then the first matching device
574    will always be returned. */
575 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
576 {
577         int i;
578
579         if (next == NULL)
580                 i = 0;
581         else if (*next >= 0 && *next < class_devno_max())
582                 i = *next;
583         else
584                 return NULL;
585
586         read_lock(&obd_dev_lock);
587         for (; i < class_devno_max(); i++) {
588                 struct obd_device *obd = class_num2obd(i);
589
590                 if (obd == NULL)
591                         continue;
592                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
593                         if (next != NULL)
594                                 *next = i+1;
595                         read_unlock(&obd_dev_lock);
596                         return obd;
597                 }
598         }
599         read_unlock(&obd_dev_lock);
600
601         return NULL;
602 }
603 EXPORT_SYMBOL(class_devices_in_group);
604
605 /**
606  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
607  * adjust sptlrpc settings accordingly.
608  */
609 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
610 {
611         struct obd_device  *obd;
612         const char         *type;
613         int                 i, rc = 0, rc2;
614
615         LASSERT(namelen > 0);
616
617         read_lock(&obd_dev_lock);
618         for (i = 0; i < class_devno_max(); i++) {
619                 obd = class_num2obd(i);
620
621                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
622                         continue;
623
624                 /* only notify mdc, osc, mdt, ost */
625                 type = obd->obd_type->typ_name;
626                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
627                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
628                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
629                     strcmp(type, LUSTRE_OST_NAME) != 0)
630                         continue;
631
632                 if (strncmp(obd->obd_name, fsname, namelen))
633                         continue;
634
635                 class_incref(obd, __FUNCTION__, obd);
636                 read_unlock(&obd_dev_lock);
637                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
638                                          sizeof(KEY_SPTLRPC_CONF),
639                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
640                 rc = rc ? rc : rc2;
641                 class_decref(obd, __FUNCTION__, obd);
642                 read_lock(&obd_dev_lock);
643         }
644         read_unlock(&obd_dev_lock);
645         return rc;
646 }
647 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
648
649 void obd_cleanup_caches(void)
650 {
651         ENTRY;
652         if (obd_device_cachep) {
653                 kmem_cache_destroy(obd_device_cachep);
654                 obd_device_cachep = NULL;
655         }
656         if (obdo_cachep) {
657                 kmem_cache_destroy(obdo_cachep);
658                 obdo_cachep = NULL;
659         }
660         if (import_cachep) {
661                 kmem_cache_destroy(import_cachep);
662                 import_cachep = NULL;
663         }
664
665         EXIT;
666 }
667
668 int obd_init_caches(void)
669 {
670         int rc;
671         ENTRY;
672
673         LASSERT(obd_device_cachep == NULL);
674         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
675                                               sizeof(struct obd_device),
676                                               0, 0, NULL);
677         if (!obd_device_cachep)
678                 GOTO(out, rc = -ENOMEM);
679
680         LASSERT(obdo_cachep == NULL);
681         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
682                                         0, 0, NULL);
683         if (!obdo_cachep)
684                 GOTO(out, rc = -ENOMEM);
685
686         LASSERT(import_cachep == NULL);
687         import_cachep = kmem_cache_create("ll_import_cache",
688                                           sizeof(struct obd_import),
689                                           0, 0, NULL);
690         if (!import_cachep)
691                 GOTO(out, rc = -ENOMEM);
692
693         RETURN(0);
694 out:
695         obd_cleanup_caches();
696         RETURN(rc);
697 }
698
699 /* map connection to client */
700 struct obd_export *class_conn2export(struct lustre_handle *conn)
701 {
702         struct obd_export *export;
703         ENTRY;
704
705         if (!conn) {
706                 CDEBUG(D_CACHE, "looking for null handle\n");
707                 RETURN(NULL);
708         }
709
710         if (conn->cookie == -1) {  /* this means assign a new connection */
711                 CDEBUG(D_CACHE, "want a new connection\n");
712                 RETURN(NULL);
713         }
714
715         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
716         export = class_handle2object(conn->cookie, NULL);
717         RETURN(export);
718 }
719 EXPORT_SYMBOL(class_conn2export);
720
721 struct obd_device *class_exp2obd(struct obd_export *exp)
722 {
723         if (exp)
724                 return exp->exp_obd;
725         return NULL;
726 }
727 EXPORT_SYMBOL(class_exp2obd);
728
729 struct obd_device *class_conn2obd(struct lustre_handle *conn)
730 {
731         struct obd_export *export;
732         export = class_conn2export(conn);
733         if (export) {
734                 struct obd_device *obd = export->exp_obd;
735                 class_export_put(export);
736                 return obd;
737         }
738         return NULL;
739 }
740
741 struct obd_import *class_exp2cliimp(struct obd_export *exp)
742 {
743         struct obd_device *obd = exp->exp_obd;
744         if (obd == NULL)
745                 return NULL;
746         return obd->u.cli.cl_import;
747 }
748 EXPORT_SYMBOL(class_exp2cliimp);
749
750 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
751 {
752         struct obd_device *obd = class_conn2obd(conn);
753         if (obd == NULL)
754                 return NULL;
755         return obd->u.cli.cl_import;
756 }
757
758 /* Export management functions */
759 static void class_export_destroy(struct obd_export *exp)
760 {
761         struct obd_device *obd = exp->exp_obd;
762         ENTRY;
763
764         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
765         LASSERT(obd != NULL);
766
767         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
768                exp->exp_client_uuid.uuid, obd->obd_name);
769
770         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
771         if (exp->exp_connection)
772                 ptlrpc_put_connection_superhack(exp->exp_connection);
773
774         LASSERT(list_empty(&exp->exp_outstanding_replies));
775         LASSERT(list_empty(&exp->exp_uncommitted_replies));
776         LASSERT(list_empty(&exp->exp_req_replay_queue));
777         LASSERT(list_empty(&exp->exp_hp_rpcs));
778         obd_destroy_export(exp);
779         class_decref(obd, "export", exp);
780
781         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
782         EXIT;
783 }
784
785 static void export_handle_addref(void *export)
786 {
787         class_export_get(export);
788 }
789
790 static struct portals_handle_ops export_handle_ops = {
791         .hop_addref = export_handle_addref,
792         .hop_free   = NULL,
793 };
794
795 struct obd_export *class_export_get(struct obd_export *exp)
796 {
797         atomic_inc(&exp->exp_refcount);
798         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
799                atomic_read(&exp->exp_refcount));
800         return exp;
801 }
802 EXPORT_SYMBOL(class_export_get);
803
804 void class_export_put(struct obd_export *exp)
805 {
806         LASSERT(exp != NULL);
807         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
808         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
809                atomic_read(&exp->exp_refcount) - 1);
810
811         if (atomic_dec_and_test(&exp->exp_refcount)) {
812                 LASSERT(!list_empty(&exp->exp_obd_chain));
813                 CDEBUG(D_IOCTL, "final put %p/%s\n",
814                        exp, exp->exp_client_uuid.uuid);
815
816                 /* release nid stat refererence */
817                 lprocfs_exp_cleanup(exp);
818
819                 obd_zombie_export_add(exp);
820         }
821 }
822 EXPORT_SYMBOL(class_export_put);
823
824 /* Creates a new export, adds it to the hash table, and returns a
825  * pointer to it. The refcount is 2: one for the hash reference, and
826  * one for the pointer returned by this function. */
827 struct obd_export *class_new_export(struct obd_device *obd,
828                                     struct obd_uuid *cluuid)
829 {
830         struct obd_export *export;
831         struct cfs_hash *hash = NULL;
832         int rc = 0;
833         ENTRY;
834
835         OBD_ALLOC_PTR(export);
836         if (!export)
837                 return ERR_PTR(-ENOMEM);
838
839         export->exp_conn_cnt = 0;
840         export->exp_lock_hash = NULL;
841         export->exp_flock_hash = NULL;
842         atomic_set(&export->exp_refcount, 2);
843         atomic_set(&export->exp_rpc_count, 0);
844         atomic_set(&export->exp_cb_count, 0);
845         atomic_set(&export->exp_locks_count, 0);
846 #if LUSTRE_TRACKS_LOCK_EXP_REFS
847         INIT_LIST_HEAD(&export->exp_locks_list);
848         spin_lock_init(&export->exp_locks_list_guard);
849 #endif
850         atomic_set(&export->exp_replay_count, 0);
851         export->exp_obd = obd;
852         INIT_LIST_HEAD(&export->exp_outstanding_replies);
853         spin_lock_init(&export->exp_uncommitted_replies_lock);
854         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
855         INIT_LIST_HEAD(&export->exp_req_replay_queue);
856         INIT_LIST_HEAD(&export->exp_handle.h_link);
857         INIT_LIST_HEAD(&export->exp_hp_rpcs);
858         INIT_LIST_HEAD(&export->exp_reg_rpcs);
859         class_handle_hash(&export->exp_handle, &export_handle_ops);
860         export->exp_last_request_time = cfs_time_current_sec();
861         spin_lock_init(&export->exp_lock);
862         spin_lock_init(&export->exp_rpc_lock);
863         INIT_HLIST_NODE(&export->exp_uuid_hash);
864         INIT_HLIST_NODE(&export->exp_nid_hash);
865         INIT_HLIST_NODE(&export->exp_gen_hash);
866         spin_lock_init(&export->exp_bl_list_lock);
867         INIT_LIST_HEAD(&export->exp_bl_list);
868
869         export->exp_sp_peer = LUSTRE_SP_ANY;
870         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
871         export->exp_client_uuid = *cluuid;
872         obd_init_export(export);
873
874         spin_lock(&obd->obd_dev_lock);
875         /* shouldn't happen, but might race */
876         if (obd->obd_stopping)
877                 GOTO(exit_unlock, rc = -ENODEV);
878
879         hash = cfs_hash_getref(obd->obd_uuid_hash);
880         if (hash == NULL)
881                 GOTO(exit_unlock, rc = -ENODEV);
882         spin_unlock(&obd->obd_dev_lock);
883
884         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
885                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
886                 if (rc != 0) {
887                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
888                                       obd->obd_name, cluuid->uuid, rc);
889                         GOTO(exit_err, rc = -EALREADY);
890                 }
891         }
892
893         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
894         spin_lock(&obd->obd_dev_lock);
895         if (obd->obd_stopping) {
896                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
897                 GOTO(exit_unlock, rc = -ENODEV);
898         }
899
900         class_incref(obd, "export", export);
901         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
902         list_add_tail(&export->exp_obd_chain_timed,
903                       &export->exp_obd->obd_exports_timed);
904         export->exp_obd->obd_num_exports++;
905         spin_unlock(&obd->obd_dev_lock);
906         cfs_hash_putref(hash);
907         RETURN(export);
908
909 exit_unlock:
910         spin_unlock(&obd->obd_dev_lock);
911 exit_err:
912         if (hash)
913                 cfs_hash_putref(hash);
914         class_handle_unhash(&export->exp_handle);
915         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
916         obd_destroy_export(export);
917         OBD_FREE_PTR(export);
918         return ERR_PTR(rc);
919 }
920 EXPORT_SYMBOL(class_new_export);
921
922 void class_unlink_export(struct obd_export *exp)
923 {
924         class_handle_unhash(&exp->exp_handle);
925
926         spin_lock(&exp->exp_obd->obd_dev_lock);
927         /* delete an uuid-export hashitem from hashtables */
928         if (!hlist_unhashed(&exp->exp_uuid_hash))
929                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
930                              &exp->exp_client_uuid,
931                              &exp->exp_uuid_hash);
932
933         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
934         list_del_init(&exp->exp_obd_chain_timed);
935         exp->exp_obd->obd_num_exports--;
936         spin_unlock(&exp->exp_obd->obd_dev_lock);
937         class_export_put(exp);
938 }
939
940 /* Import management functions */
941 static void class_import_destroy(struct obd_import *imp)
942 {
943         ENTRY;
944
945         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
946                 imp->imp_obd->obd_name);
947
948         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
949
950         ptlrpc_put_connection_superhack(imp->imp_connection);
951
952         while (!list_empty(&imp->imp_conn_list)) {
953                 struct obd_import_conn *imp_conn;
954
955                 imp_conn = list_entry(imp->imp_conn_list.next,
956                                       struct obd_import_conn, oic_item);
957                 list_del_init(&imp_conn->oic_item);
958                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
959                 OBD_FREE(imp_conn, sizeof(*imp_conn));
960         }
961
962         LASSERT(imp->imp_sec == NULL);
963         class_decref(imp->imp_obd, "import", imp);
964         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
965         EXIT;
966 }
967
968 static void import_handle_addref(void *import)
969 {
970         class_import_get(import);
971 }
972
973 static struct portals_handle_ops import_handle_ops = {
974         .hop_addref = import_handle_addref,
975         .hop_free   = NULL,
976 };
977
978 struct obd_import *class_import_get(struct obd_import *import)
979 {
980         atomic_inc(&import->imp_refcount);
981         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
982                atomic_read(&import->imp_refcount),
983                import->imp_obd->obd_name);
984         return import;
985 }
986 EXPORT_SYMBOL(class_import_get);
987
988 void class_import_put(struct obd_import *imp)
989 {
990         ENTRY;
991
992         LASSERT(list_empty(&imp->imp_zombie_chain));
993         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
994
995         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
996                atomic_read(&imp->imp_refcount) - 1,
997                imp->imp_obd->obd_name);
998
999         if (atomic_dec_and_test(&imp->imp_refcount)) {
1000                 CDEBUG(D_INFO, "final put import %p\n", imp);
1001                 obd_zombie_import_add(imp);
1002         }
1003
1004         /* catch possible import put race */
1005         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1006         EXIT;
1007 }
1008 EXPORT_SYMBOL(class_import_put);
1009
1010 static void init_imp_at(struct imp_at *at) {
1011         int i;
1012         at_init(&at->iat_net_latency, 0, 0);
1013         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1014                 /* max service estimates are tracked on the server side, so
1015                    don't use the AT history here, just use the last reported
1016                    val. (But keep hist for proc histogram, worst_ever) */
1017                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1018                         AT_FLG_NOHIST);
1019         }
1020 }
1021
1022 struct obd_import *class_new_import(struct obd_device *obd)
1023 {
1024         struct obd_import *imp;
1025
1026         OBD_ALLOC(imp, sizeof(*imp));
1027         if (imp == NULL)
1028                 return NULL;
1029
1030         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1031         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1032         INIT_LIST_HEAD(&imp->imp_replay_list);
1033         INIT_LIST_HEAD(&imp->imp_sending_list);
1034         INIT_LIST_HEAD(&imp->imp_delayed_list);
1035         INIT_LIST_HEAD(&imp->imp_committed_list);
1036         imp->imp_replay_cursor = &imp->imp_committed_list;
1037         spin_lock_init(&imp->imp_lock);
1038         imp->imp_last_success_conn = 0;
1039         imp->imp_state = LUSTRE_IMP_NEW;
1040         imp->imp_obd = class_incref(obd, "import", imp);
1041         mutex_init(&imp->imp_sec_mutex);
1042         init_waitqueue_head(&imp->imp_recovery_waitq);
1043
1044         atomic_set(&imp->imp_refcount, 2);
1045         atomic_set(&imp->imp_unregistering, 0);
1046         atomic_set(&imp->imp_inflight, 0);
1047         atomic_set(&imp->imp_replay_inflight, 0);
1048         atomic_set(&imp->imp_inval_count, 0);
1049         INIT_LIST_HEAD(&imp->imp_conn_list);
1050         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1051         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1052         init_imp_at(&imp->imp_at);
1053
1054         /* the default magic is V2, will be used in connect RPC, and
1055          * then adjusted according to the flags in request/reply. */
1056         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1057
1058         return imp;
1059 }
1060 EXPORT_SYMBOL(class_new_import);
1061
1062 void class_destroy_import(struct obd_import *import)
1063 {
1064         LASSERT(import != NULL);
1065         LASSERT(import != LP_POISON);
1066
1067         class_handle_unhash(&import->imp_handle);
1068
1069         spin_lock(&import->imp_lock);
1070         import->imp_generation++;
1071         spin_unlock(&import->imp_lock);
1072         class_import_put(import);
1073 }
1074 EXPORT_SYMBOL(class_destroy_import);
1075
1076 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1077
1078 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1079 {
1080         spin_lock(&exp->exp_locks_list_guard);
1081
1082         LASSERT(lock->l_exp_refs_nr >= 0);
1083
1084         if (lock->l_exp_refs_target != NULL &&
1085             lock->l_exp_refs_target != exp) {
1086                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1087                               exp, lock, lock->l_exp_refs_target);
1088         }
1089         if ((lock->l_exp_refs_nr ++) == 0) {
1090                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1091                 lock->l_exp_refs_target = exp;
1092         }
1093         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1094                lock, exp, lock->l_exp_refs_nr);
1095         spin_unlock(&exp->exp_locks_list_guard);
1096 }
1097
1098 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1099 {
1100         spin_lock(&exp->exp_locks_list_guard);
1101         LASSERT(lock->l_exp_refs_nr > 0);
1102         if (lock->l_exp_refs_target != exp) {
1103                 LCONSOLE_WARN("lock %p, "
1104                               "mismatching export pointers: %p, %p\n",
1105                               lock, lock->l_exp_refs_target, exp);
1106         }
1107         if (-- lock->l_exp_refs_nr == 0) {
1108                 list_del_init(&lock->l_exp_refs_link);
1109                 lock->l_exp_refs_target = NULL;
1110         }
1111         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1112                lock, exp, lock->l_exp_refs_nr);
1113         spin_unlock(&exp->exp_locks_list_guard);
1114 }
1115 #endif
1116
1117 /* A connection defines an export context in which preallocation can
1118    be managed. This releases the export pointer reference, and returns
1119    the export handle, so the export refcount is 1 when this function
1120    returns. */
1121 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1122                   struct obd_uuid *cluuid)
1123 {
1124         struct obd_export *export;
1125         LASSERT(conn != NULL);
1126         LASSERT(obd != NULL);
1127         LASSERT(cluuid != NULL);
1128         ENTRY;
1129
1130         export = class_new_export(obd, cluuid);
1131         if (IS_ERR(export))
1132                 RETURN(PTR_ERR(export));
1133
1134         conn->cookie = export->exp_handle.h_cookie;
1135         class_export_put(export);
1136
1137         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1138                cluuid->uuid, conn->cookie);
1139         RETURN(0);
1140 }
1141 EXPORT_SYMBOL(class_connect);
1142
1143 /* if export is involved in recovery then clean up related things */
1144 static void class_export_recovery_cleanup(struct obd_export *exp)
1145 {
1146         struct obd_device *obd = exp->exp_obd;
1147
1148         spin_lock(&obd->obd_recovery_task_lock);
1149         if (obd->obd_recovering) {
1150                 if (exp->exp_in_recovery) {
1151                         spin_lock(&exp->exp_lock);
1152                         exp->exp_in_recovery = 0;
1153                         spin_unlock(&exp->exp_lock);
1154                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1155                         atomic_dec(&obd->obd_connected_clients);
1156                 }
1157
1158                 /* if called during recovery then should update
1159                  * obd_stale_clients counter,
1160                  * lightweight exports are not counted */
1161                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1162                         exp->exp_obd->obd_stale_clients++;
1163         }
1164         spin_unlock(&obd->obd_recovery_task_lock);
1165
1166         spin_lock(&exp->exp_lock);
1167         /** Cleanup req replay fields */
1168         if (exp->exp_req_replay_needed) {
1169                 exp->exp_req_replay_needed = 0;
1170
1171                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1172                 atomic_dec(&obd->obd_req_replay_clients);
1173         }
1174
1175         /** Cleanup lock replay data */
1176         if (exp->exp_lock_replay_needed) {
1177                 exp->exp_lock_replay_needed = 0;
1178
1179                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1180                 atomic_dec(&obd->obd_lock_replay_clients);
1181         }
1182         spin_unlock(&exp->exp_lock);
1183 }
1184
1185 /* This function removes 1-3 references from the export:
1186  * 1 - for export pointer passed
1187  * and if disconnect really need
1188  * 2 - removing from hash
1189  * 3 - in client_unlink_export
1190  * The export pointer passed to this function can destroyed */
1191 int class_disconnect(struct obd_export *export)
1192 {
1193         int already_disconnected;
1194         ENTRY;
1195
1196         if (export == NULL) {
1197                 CWARN("attempting to free NULL export %p\n", export);
1198                 RETURN(-EINVAL);
1199         }
1200
1201         spin_lock(&export->exp_lock);
1202         already_disconnected = export->exp_disconnected;
1203         export->exp_disconnected = 1;
1204         spin_unlock(&export->exp_lock);
1205
1206         /* class_cleanup(), abort_recovery(), and class_fail_export()
1207          * all end up in here, and if any of them race we shouldn't
1208          * call extra class_export_puts(). */
1209         if (already_disconnected) {
1210                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1211                 GOTO(no_disconn, already_disconnected);
1212         }
1213
1214         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1215                export->exp_handle.h_cookie);
1216
1217         if (!hlist_unhashed(&export->exp_nid_hash))
1218                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1219                              &export->exp_connection->c_peer.nid,
1220                              &export->exp_nid_hash);
1221
1222         class_export_recovery_cleanup(export);
1223         class_unlink_export(export);
1224 no_disconn:
1225         class_export_put(export);
1226         RETURN(0);
1227 }
1228 EXPORT_SYMBOL(class_disconnect);
1229
1230 /* Return non-zero for a fully connected export */
1231 int class_connected_export(struct obd_export *exp)
1232 {
1233         int connected = 0;
1234
1235         if (exp) {
1236                 spin_lock(&exp->exp_lock);
1237                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1238                 spin_unlock(&exp->exp_lock);
1239         }
1240         return connected;
1241 }
1242 EXPORT_SYMBOL(class_connected_export);
1243
1244 static void class_disconnect_export_list(struct list_head *list,
1245                                          enum obd_option flags)
1246 {
1247         int rc;
1248         struct obd_export *exp;
1249         ENTRY;
1250
1251         /* It's possible that an export may disconnect itself, but
1252          * nothing else will be added to this list. */
1253         while (!list_empty(list)) {
1254                 exp = list_entry(list->next, struct obd_export,
1255                                  exp_obd_chain);
1256                 /* need for safe call CDEBUG after obd_disconnect */
1257                 class_export_get(exp);
1258
1259                 spin_lock(&exp->exp_lock);
1260                 exp->exp_flags = flags;
1261                 spin_unlock(&exp->exp_lock);
1262
1263                 if (obd_uuid_equals(&exp->exp_client_uuid,
1264                                     &exp->exp_obd->obd_uuid)) {
1265                         CDEBUG(D_HA,
1266                                "exp %p export uuid == obd uuid, don't discon\n",
1267                                exp);
1268                         /* Need to delete this now so we don't end up pointing
1269                          * to work_list later when this export is cleaned up. */
1270                         list_del_init(&exp->exp_obd_chain);
1271                         class_export_put(exp);
1272                         continue;
1273                 }
1274
1275                 class_export_get(exp);
1276                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1277                        "last request at "CFS_TIME_T"\n",
1278                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1279                        exp, exp->exp_last_request_time);
1280                 /* release one export reference anyway */
1281                 rc = obd_disconnect(exp);
1282
1283                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1284                        obd_export_nid2str(exp), exp, rc);
1285                 class_export_put(exp);
1286         }
1287         EXIT;
1288 }
1289
1290 void class_disconnect_exports(struct obd_device *obd)
1291 {
1292         struct list_head work_list;
1293         ENTRY;
1294
1295         /* Move all of the exports from obd_exports to a work list, en masse. */
1296         INIT_LIST_HEAD(&work_list);
1297         spin_lock(&obd->obd_dev_lock);
1298         list_splice_init(&obd->obd_exports, &work_list);
1299         list_splice_init(&obd->obd_delayed_exports, &work_list);
1300         spin_unlock(&obd->obd_dev_lock);
1301
1302         if (!list_empty(&work_list)) {
1303                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1304                        "disconnecting them\n", obd->obd_minor, obd);
1305                 class_disconnect_export_list(&work_list,
1306                                              exp_flags_from_obd(obd));
1307         } else
1308                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1309                        obd->obd_minor, obd);
1310         EXIT;
1311 }
1312 EXPORT_SYMBOL(class_disconnect_exports);
1313
1314 /* Remove exports that have not completed recovery.
1315  */
1316 void class_disconnect_stale_exports(struct obd_device *obd,
1317                                     int (*test_export)(struct obd_export *))
1318 {
1319         struct list_head work_list;
1320         struct obd_export *exp, *n;
1321         int evicted = 0;
1322         ENTRY;
1323
1324         INIT_LIST_HEAD(&work_list);
1325         spin_lock(&obd->obd_dev_lock);
1326         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1327                                  exp_obd_chain) {
1328                 /* don't count self-export as client */
1329                 if (obd_uuid_equals(&exp->exp_client_uuid,
1330                                     &exp->exp_obd->obd_uuid))
1331                         continue;
1332
1333                 /* don't evict clients which have no slot in last_rcvd
1334                  * (e.g. lightweight connection) */
1335                 if (exp->exp_target_data.ted_lr_idx == -1)
1336                         continue;
1337
1338                 spin_lock(&exp->exp_lock);
1339                 if (exp->exp_failed || test_export(exp)) {
1340                         spin_unlock(&exp->exp_lock);
1341                         continue;
1342                 }
1343                 exp->exp_failed = 1;
1344                 spin_unlock(&exp->exp_lock);
1345
1346                 list_move(&exp->exp_obd_chain, &work_list);
1347                 evicted++;
1348                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1349                        obd->obd_name, exp->exp_client_uuid.uuid,
1350                        exp->exp_connection == NULL ? "<unknown>" :
1351                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1352                 print_export_data(exp, "EVICTING", 0);
1353         }
1354         spin_unlock(&obd->obd_dev_lock);
1355
1356         if (evicted)
1357                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1358                               obd->obd_name, evicted);
1359
1360         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1361                                                  OBD_OPT_ABORT_RECOV);
1362         EXIT;
1363 }
1364 EXPORT_SYMBOL(class_disconnect_stale_exports);
1365
1366 void class_fail_export(struct obd_export *exp)
1367 {
1368         int rc, already_failed;
1369
1370         spin_lock(&exp->exp_lock);
1371         already_failed = exp->exp_failed;
1372         exp->exp_failed = 1;
1373         spin_unlock(&exp->exp_lock);
1374
1375         if (already_failed) {
1376                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1377                        exp, exp->exp_client_uuid.uuid);
1378                 return;
1379         }
1380
1381         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1382                exp, exp->exp_client_uuid.uuid);
1383
1384         if (obd_dump_on_timeout)
1385                 libcfs_debug_dumplog();
1386
1387         /* need for safe call CDEBUG after obd_disconnect */
1388         class_export_get(exp);
1389
1390         /* Most callers into obd_disconnect are removing their own reference
1391          * (request, for example) in addition to the one from the hash table.
1392          * We don't have such a reference here, so make one. */
1393         class_export_get(exp);
1394         rc = obd_disconnect(exp);
1395         if (rc)
1396                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1397         else
1398                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1399                        exp, exp->exp_client_uuid.uuid);
1400         class_export_put(exp);
1401 }
1402 EXPORT_SYMBOL(class_fail_export);
1403
1404 char *obd_export_nid2str(struct obd_export *exp)
1405 {
1406         if (exp->exp_connection != NULL)
1407                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1408
1409         return "(no nid)";
1410 }
1411 EXPORT_SYMBOL(obd_export_nid2str);
1412
1413 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1414 {
1415         struct cfs_hash *nid_hash;
1416         struct obd_export *doomed_exp = NULL;
1417         int exports_evicted = 0;
1418
1419         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1420
1421         spin_lock(&obd->obd_dev_lock);
1422         /* umount has run already, so evict thread should leave
1423          * its task to umount thread now */
1424         if (obd->obd_stopping) {
1425                 spin_unlock(&obd->obd_dev_lock);
1426                 return exports_evicted;
1427         }
1428         nid_hash = obd->obd_nid_hash;
1429         cfs_hash_getref(nid_hash);
1430         spin_unlock(&obd->obd_dev_lock);
1431
1432         do {
1433                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1434                 if (doomed_exp == NULL)
1435                         break;
1436
1437                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1438                          "nid %s found, wanted nid %s, requested nid %s\n",
1439                          obd_export_nid2str(doomed_exp),
1440                          libcfs_nid2str(nid_key), nid);
1441                 LASSERTF(doomed_exp != obd->obd_self_export,
1442                          "self-export is hashed by NID?\n");
1443                 exports_evicted++;
1444                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1445                               "request\n", obd->obd_name,
1446                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1447                               obd_export_nid2str(doomed_exp));
1448                 class_fail_export(doomed_exp);
1449                 class_export_put(doomed_exp);
1450         } while (1);
1451
1452         cfs_hash_putref(nid_hash);
1453
1454         if (!exports_evicted)
1455                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1456                        obd->obd_name, nid);
1457         return exports_evicted;
1458 }
1459 EXPORT_SYMBOL(obd_export_evict_by_nid);
1460
1461 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1462 {
1463         struct cfs_hash *uuid_hash;
1464         struct obd_export *doomed_exp = NULL;
1465         struct obd_uuid doomed_uuid;
1466         int exports_evicted = 0;
1467
1468         spin_lock(&obd->obd_dev_lock);
1469         if (obd->obd_stopping) {
1470                 spin_unlock(&obd->obd_dev_lock);
1471                 return exports_evicted;
1472         }
1473         uuid_hash = obd->obd_uuid_hash;
1474         cfs_hash_getref(uuid_hash);
1475         spin_unlock(&obd->obd_dev_lock);
1476
1477         obd_str2uuid(&doomed_uuid, uuid);
1478         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1479                 CERROR("%s: can't evict myself\n", obd->obd_name);
1480                 cfs_hash_putref(uuid_hash);
1481                 return exports_evicted;
1482         }
1483
1484         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1485
1486         if (doomed_exp == NULL) {
1487                 CERROR("%s: can't disconnect %s: no exports found\n",
1488                        obd->obd_name, uuid);
1489         } else {
1490                 CWARN("%s: evicting %s at adminstrative request\n",
1491                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1492                 class_fail_export(doomed_exp);
1493                 class_export_put(doomed_exp);
1494                 exports_evicted++;
1495         }
1496         cfs_hash_putref(uuid_hash);
1497
1498         return exports_evicted;
1499 }
1500
1501 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1502 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1503 #endif
1504
1505 static void print_export_data(struct obd_export *exp, const char *status,
1506                               int locks)
1507 {
1508         struct ptlrpc_reply_state *rs;
1509         struct ptlrpc_reply_state *first_reply = NULL;
1510         int nreplies = 0;
1511
1512         spin_lock(&exp->exp_lock);
1513         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1514                             rs_exp_list) {
1515                 if (nreplies == 0)
1516                         first_reply = rs;
1517                 nreplies++;
1518         }
1519         spin_unlock(&exp->exp_lock);
1520
1521         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1522                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1523                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1524                atomic_read(&exp->exp_rpc_count),
1525                atomic_read(&exp->exp_cb_count),
1526                atomic_read(&exp->exp_locks_count),
1527                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1528                nreplies, first_reply, nreplies > 3 ? "..." : "",
1529                exp->exp_last_committed);
1530 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1531         if (locks && class_export_dump_hook != NULL)
1532                 class_export_dump_hook(exp);
1533 #endif
1534 }
1535
1536 void dump_exports(struct obd_device *obd, int locks)
1537 {
1538         struct obd_export *exp;
1539
1540         spin_lock(&obd->obd_dev_lock);
1541         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1542                 print_export_data(exp, "ACTIVE", locks);
1543         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1544                 print_export_data(exp, "UNLINKED", locks);
1545         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1546                 print_export_data(exp, "DELAYED", locks);
1547         spin_unlock(&obd->obd_dev_lock);
1548         spin_lock(&obd_zombie_impexp_lock);
1549         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1550                 print_export_data(exp, "ZOMBIE", locks);
1551         spin_unlock(&obd_zombie_impexp_lock);
1552 }
1553
1554 void obd_exports_barrier(struct obd_device *obd)
1555 {
1556         int waited = 2;
1557         LASSERT(list_empty(&obd->obd_exports));
1558         spin_lock(&obd->obd_dev_lock);
1559         while (!list_empty(&obd->obd_unlinked_exports)) {
1560                 spin_unlock(&obd->obd_dev_lock);
1561                 set_current_state(TASK_UNINTERRUPTIBLE);
1562                 schedule_timeout(cfs_time_seconds(waited));
1563                 if (waited > 5 && IS_PO2(waited)) {
1564                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1565                                       "more than %d seconds. "
1566                                       "The obd refcount = %d. Is it stuck?\n",
1567                                       obd->obd_name, waited,
1568                                       atomic_read(&obd->obd_refcount));
1569                         dump_exports(obd, 1);
1570                 }
1571                 waited *= 2;
1572                 spin_lock(&obd->obd_dev_lock);
1573         }
1574         spin_unlock(&obd->obd_dev_lock);
1575 }
1576 EXPORT_SYMBOL(obd_exports_barrier);
1577
1578 /* Total amount of zombies to be destroyed */
1579 static int zombies_count = 0;
1580
1581 /**
1582  * kill zombie imports and exports
1583  */
1584 void obd_zombie_impexp_cull(void)
1585 {
1586         struct obd_import *import;
1587         struct obd_export *export;
1588         ENTRY;
1589
1590         do {
1591                 spin_lock(&obd_zombie_impexp_lock);
1592
1593                 import = NULL;
1594                 if (!list_empty(&obd_zombie_imports)) {
1595                         import = list_entry(obd_zombie_imports.next,
1596                                             struct obd_import,
1597                                             imp_zombie_chain);
1598                         list_del_init(&import->imp_zombie_chain);
1599                 }
1600
1601                 export = NULL;
1602                 if (!list_empty(&obd_zombie_exports)) {
1603                         export = list_entry(obd_zombie_exports.next,
1604                                             struct obd_export,
1605                                             exp_obd_chain);
1606                         list_del_init(&export->exp_obd_chain);
1607                 }
1608
1609                 spin_unlock(&obd_zombie_impexp_lock);
1610
1611                 if (import != NULL) {
1612                         class_import_destroy(import);
1613                         spin_lock(&obd_zombie_impexp_lock);
1614                         zombies_count--;
1615                         spin_unlock(&obd_zombie_impexp_lock);
1616                 }
1617
1618                 if (export != NULL) {
1619                         class_export_destroy(export);
1620                         spin_lock(&obd_zombie_impexp_lock);
1621                         zombies_count--;
1622                         spin_unlock(&obd_zombie_impexp_lock);
1623                 }
1624
1625                 cond_resched();
1626         } while (import != NULL || export != NULL);
1627         EXIT;
1628 }
1629
1630 static struct completion        obd_zombie_start;
1631 static struct completion        obd_zombie_stop;
1632 static unsigned long            obd_zombie_flags;
1633 static wait_queue_head_t        obd_zombie_waitq;
1634 static pid_t                    obd_zombie_pid;
1635
1636 enum {
1637         OBD_ZOMBIE_STOP         = 0x0001,
1638 };
1639
1640 /**
1641  * check for work for kill zombie import/export thread.
1642  */
1643 static int obd_zombie_impexp_check(void *arg)
1644 {
1645         int rc;
1646
1647         spin_lock(&obd_zombie_impexp_lock);
1648         rc = (zombies_count == 0) &&
1649              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1650         spin_unlock(&obd_zombie_impexp_lock);
1651
1652         RETURN(rc);
1653 }
1654
1655 /**
1656  * Add export to the obd_zombe thread and notify it.
1657  */
1658 static void obd_zombie_export_add(struct obd_export *exp) {
1659         spin_lock(&exp->exp_obd->obd_dev_lock);
1660         LASSERT(!list_empty(&exp->exp_obd_chain));
1661         list_del_init(&exp->exp_obd_chain);
1662         spin_unlock(&exp->exp_obd->obd_dev_lock);
1663         spin_lock(&obd_zombie_impexp_lock);
1664         zombies_count++;
1665         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1666         spin_unlock(&obd_zombie_impexp_lock);
1667
1668         obd_zombie_impexp_notify();
1669 }
1670
1671 /**
1672  * Add import to the obd_zombe thread and notify it.
1673  */
1674 static void obd_zombie_import_add(struct obd_import *imp) {
1675         LASSERT(imp->imp_sec == NULL);
1676         LASSERT(imp->imp_rq_pool == NULL);
1677         spin_lock(&obd_zombie_impexp_lock);
1678         LASSERT(list_empty(&imp->imp_zombie_chain));
1679         zombies_count++;
1680         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1681         spin_unlock(&obd_zombie_impexp_lock);
1682
1683         obd_zombie_impexp_notify();
1684 }
1685
1686 /**
1687  * notify import/export destroy thread about new zombie.
1688  */
1689 static void obd_zombie_impexp_notify(void)
1690 {
1691         /*
1692          * Make sure obd_zomebie_impexp_thread get this notification.
1693          * It is possible this signal only get by obd_zombie_barrier, and
1694          * barrier gulps this notification and sleeps away and hangs ensues
1695          */
1696         wake_up_all(&obd_zombie_waitq);
1697 }
1698
1699 /**
1700  * check whether obd_zombie is idle
1701  */
1702 static int obd_zombie_is_idle(void)
1703 {
1704         int rc;
1705
1706         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1707         spin_lock(&obd_zombie_impexp_lock);
1708         rc = (zombies_count == 0);
1709         spin_unlock(&obd_zombie_impexp_lock);
1710         return rc;
1711 }
1712
1713 /**
1714  * wait when obd_zombie import/export queues become empty
1715  */
1716 void obd_zombie_barrier(void)
1717 {
1718         struct l_wait_info lwi = { 0 };
1719
1720         if (obd_zombie_pid == current_pid())
1721                 /* don't wait for myself */
1722                 return;
1723         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1724 }
1725 EXPORT_SYMBOL(obd_zombie_barrier);
1726
1727
1728 /**
1729  * destroy zombie export/import thread.
1730  */
1731 static int obd_zombie_impexp_thread(void *unused)
1732 {
1733         unshare_fs_struct();
1734         complete(&obd_zombie_start);
1735
1736         obd_zombie_pid = current_pid();
1737
1738         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1739                 struct l_wait_info lwi = { 0 };
1740
1741                 l_wait_event(obd_zombie_waitq,
1742                              !obd_zombie_impexp_check(NULL), &lwi);
1743                 obd_zombie_impexp_cull();
1744
1745                 /*
1746                  * Notify obd_zombie_barrier callers that queues
1747                  * may be empty.
1748                  */
1749                 wake_up(&obd_zombie_waitq);
1750         }
1751
1752         complete(&obd_zombie_stop);
1753
1754         RETURN(0);
1755 }
1756
1757
1758 /**
1759  * start destroy zombie import/export thread
1760  */
1761 int obd_zombie_impexp_init(void)
1762 {
1763         struct task_struct *task;
1764
1765         INIT_LIST_HEAD(&obd_zombie_imports);
1766
1767         INIT_LIST_HEAD(&obd_zombie_exports);
1768         spin_lock_init(&obd_zombie_impexp_lock);
1769         init_completion(&obd_zombie_start);
1770         init_completion(&obd_zombie_stop);
1771         init_waitqueue_head(&obd_zombie_waitq);
1772         obd_zombie_pid = 0;
1773
1774         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1775         if (IS_ERR(task))
1776                 RETURN(PTR_ERR(task));
1777
1778         wait_for_completion(&obd_zombie_start);
1779         RETURN(0);
1780 }
1781 /**
1782  * stop destroy zombie import/export thread
1783  */
1784 void obd_zombie_impexp_stop(void)
1785 {
1786         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1787         obd_zombie_impexp_notify();
1788         wait_for_completion(&obd_zombie_stop);
1789 }
1790
1791 /***** Kernel-userspace comm helpers *******/
1792
1793 /* Get length of entire message, including header */
1794 int kuc_len(int payload_len)
1795 {
1796         return sizeof(struct kuc_hdr) + payload_len;
1797 }
1798 EXPORT_SYMBOL(kuc_len);
1799
1800 /* Get a pointer to kuc header, given a ptr to the payload
1801  * @param p Pointer to payload area
1802  * @returns Pointer to kuc header
1803  */
1804 struct kuc_hdr * kuc_ptr(void *p)
1805 {
1806         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1807         LASSERT(lh->kuc_magic == KUC_MAGIC);
1808         return lh;
1809 }
1810 EXPORT_SYMBOL(kuc_ptr);
1811
1812 /* Test if payload is part of kuc message
1813  * @param p Pointer to payload area
1814  * @returns boolean
1815  */
1816 int kuc_ispayload(void *p)
1817 {
1818         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1819
1820         if (kh->kuc_magic == KUC_MAGIC)
1821                 return 1;
1822         else
1823                 return 0;
1824 }
1825 EXPORT_SYMBOL(kuc_ispayload);
1826
1827 /* Alloc space for a message, and fill in header
1828  * @return Pointer to payload area
1829  */
1830 void *kuc_alloc(int payload_len, int transport, int type)
1831 {
1832         struct kuc_hdr *lh;
1833         int len = kuc_len(payload_len);
1834
1835         OBD_ALLOC(lh, len);
1836         if (lh == NULL)
1837                 return ERR_PTR(-ENOMEM);
1838
1839         lh->kuc_magic = KUC_MAGIC;
1840         lh->kuc_transport = transport;
1841         lh->kuc_msgtype = type;
1842         lh->kuc_msglen = len;
1843
1844         return (void *)(lh + 1);
1845 }
1846 EXPORT_SYMBOL(kuc_alloc);
1847
1848 /* Takes pointer to payload area */
1849 inline void kuc_free(void *p, int payload_len)
1850 {
1851         struct kuc_hdr *lh = kuc_ptr(p);
1852         OBD_FREE(lh, kuc_len(payload_len));
1853 }
1854 EXPORT_SYMBOL(kuc_free);
1855
1856 struct obd_request_slot_waiter {
1857         struct list_head        orsw_entry;
1858         wait_queue_head_t       orsw_waitq;
1859         bool                    orsw_signaled;
1860 };
1861
1862 static bool obd_request_slot_avail(struct client_obd *cli,
1863                                    struct obd_request_slot_waiter *orsw)
1864 {
1865         bool avail;
1866
1867         spin_lock(&cli->cl_loi_list_lock);
1868         avail = !!list_empty(&orsw->orsw_entry);
1869         spin_unlock(&cli->cl_loi_list_lock);
1870
1871         return avail;
1872 };
1873
1874 /*
1875  * For network flow control, the RPC sponsor needs to acquire a credit
1876  * before sending the RPC. The credits count for a connection is defined
1877  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1878  * the subsequent RPC sponsors need to wait until others released their
1879  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1880  */
1881 int obd_get_request_slot(struct client_obd *cli)
1882 {
1883         struct obd_request_slot_waiter   orsw;
1884         struct l_wait_info               lwi;
1885         int                              rc;
1886
1887         spin_lock(&cli->cl_loi_list_lock);
1888         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1889                 cli->cl_r_in_flight++;
1890                 spin_unlock(&cli->cl_loi_list_lock);
1891                 return 0;
1892         }
1893
1894         init_waitqueue_head(&orsw.orsw_waitq);
1895         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1896         orsw.orsw_signaled = false;
1897         spin_unlock(&cli->cl_loi_list_lock);
1898
1899         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1900         rc = l_wait_event(orsw.orsw_waitq,
1901                           obd_request_slot_avail(cli, &orsw) ||
1902                           orsw.orsw_signaled,
1903                           &lwi);
1904
1905         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1906          * freed but other (such as obd_put_request_slot) is using it. */
1907         spin_lock(&cli->cl_loi_list_lock);
1908         if (rc != 0) {
1909                 if (!orsw.orsw_signaled) {
1910                         if (list_empty(&orsw.orsw_entry))
1911                                 cli->cl_r_in_flight--;
1912                         else
1913                                 list_del(&orsw.orsw_entry);
1914                 }
1915         }
1916
1917         if (orsw.orsw_signaled) {
1918                 LASSERT(list_empty(&orsw.orsw_entry));
1919
1920                 rc = -EINTR;
1921         }
1922         spin_unlock(&cli->cl_loi_list_lock);
1923
1924         return rc;
1925 }
1926 EXPORT_SYMBOL(obd_get_request_slot);
1927
1928 void obd_put_request_slot(struct client_obd *cli)
1929 {
1930         struct obd_request_slot_waiter *orsw;
1931
1932         spin_lock(&cli->cl_loi_list_lock);
1933         cli->cl_r_in_flight--;
1934
1935         /* If there is free slot, wakeup the first waiter. */
1936         if (!list_empty(&cli->cl_loi_read_list) &&
1937             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1938                 orsw = list_entry(cli->cl_loi_read_list.next,
1939                                   struct obd_request_slot_waiter, orsw_entry);
1940                 list_del_init(&orsw->orsw_entry);
1941                 cli->cl_r_in_flight++;
1942                 wake_up(&orsw->orsw_waitq);
1943         }
1944         spin_unlock(&cli->cl_loi_list_lock);
1945 }
1946 EXPORT_SYMBOL(obd_put_request_slot);
1947
1948 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1949 {
1950         return cli->cl_max_rpcs_in_flight;
1951 }
1952 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1953
1954 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1955 {
1956         struct obd_request_slot_waiter *orsw;
1957         __u32                           old;
1958         int                             diff;
1959         int                             i;
1960         char                            *typ_name;
1961         int                             rc;
1962
1963         if (max > OBD_MAX_RIF_MAX || max < 1)
1964                 return -ERANGE;
1965
1966         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
1967         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
1968                 /* adjust max_mod_rpcs_in_flight to ensure it is always
1969                  * strictly lower that max_rpcs_in_flight */
1970                 if (max < 2) {
1971                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
1972                                "because it must be higher than "
1973                                "max_mod_rpcs_in_flight value",
1974                                cli->cl_import->imp_obd->obd_name);
1975                         return -ERANGE;
1976                 }
1977                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
1978                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
1979                         if (rc != 0)
1980                                 return rc;
1981                 }
1982         }
1983
1984         spin_lock(&cli->cl_loi_list_lock);
1985         old = cli->cl_max_rpcs_in_flight;
1986         cli->cl_max_rpcs_in_flight = max;
1987         diff = max - old;
1988
1989         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1990         for (i = 0; i < diff; i++) {
1991                 if (list_empty(&cli->cl_loi_read_list))
1992                         break;
1993
1994                 orsw = list_entry(cli->cl_loi_read_list.next,
1995                                   struct obd_request_slot_waiter, orsw_entry);
1996                 list_del_init(&orsw->orsw_entry);
1997                 cli->cl_r_in_flight++;
1998                 wake_up(&orsw->orsw_waitq);
1999         }
2000         spin_unlock(&cli->cl_loi_list_lock);
2001
2002         return 0;
2003 }
2004 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2005
2006 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2007 {
2008         return cli->cl_max_mod_rpcs_in_flight;
2009 }
2010 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2011
2012 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2013 {
2014         struct obd_connect_data *ocd;
2015         __u16 maxmodrpcs;
2016         __u16 prev;
2017
2018         if (max > OBD_MAX_RIF_MAX || max < 1)
2019                 return -ERANGE;
2020
2021         /* cannot exceed or equal max_rpcs_in_flight */
2022         if (max >= cli->cl_max_rpcs_in_flight) {
2023                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2024                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2025                        cli->cl_import->imp_obd->obd_name,
2026                        max, cli->cl_max_rpcs_in_flight);
2027                 return -ERANGE;
2028         }
2029
2030         /* cannot exceed max modify RPCs in flight supported by the server */
2031         ocd = &cli->cl_import->imp_connect_data;
2032         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2033                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2034         else
2035                 maxmodrpcs = 1;
2036         if (max > maxmodrpcs) {
2037                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2038                        "higher than max_mod_rpcs_per_client value (%hu) "
2039                        "returned by the server at connection\n",
2040                        cli->cl_import->imp_obd->obd_name,
2041                        max, maxmodrpcs);
2042                 return -ERANGE;
2043         }
2044
2045         spin_lock(&cli->cl_mod_rpcs_lock);
2046
2047         prev = cli->cl_max_mod_rpcs_in_flight;
2048         cli->cl_max_mod_rpcs_in_flight = max;
2049
2050         /* wakeup waiters if limit has been increased */
2051         if (cli->cl_max_mod_rpcs_in_flight > prev)
2052                 wake_up(&cli->cl_mod_rpcs_waitq);
2053
2054         spin_unlock(&cli->cl_mod_rpcs_lock);
2055
2056         return 0;
2057 }
2058 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2059
2060
2061 #define pct(a, b) (b ? a * 100 / b : 0)
2062 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2063                                struct seq_file *seq)
2064 {
2065         struct timeval now;
2066         unsigned long mod_tot = 0, mod_cum;
2067         int i;
2068
2069         do_gettimeofday(&now);
2070
2071         spin_lock(&cli->cl_mod_rpcs_lock);
2072
2073         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2074                    now.tv_sec, now.tv_usec);
2075         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2076                    cli->cl_mod_rpcs_in_flight);
2077
2078         seq_printf(seq, "\n\t\t\tmodify\n");
2079         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2080
2081         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2082
2083         mod_cum = 0;
2084         for (i = 0; i < OBD_HIST_MAX; i++) {
2085                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2086                 mod_cum += mod;
2087                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2088                                  i, mod, pct(mod, mod_tot),
2089                                  pct(mod_cum, mod_tot));
2090                 if (mod_cum == mod_tot)
2091                         break;
2092         }
2093
2094         spin_unlock(&cli->cl_mod_rpcs_lock);
2095
2096         return 0;
2097 }
2098 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2099 #undef pct
2100
2101
2102 /* The number of modify RPCs sent in parallel is limited
2103  * because the server has a finite number of slots per client to
2104  * store request result and ensure reply reconstruction when needed.
2105  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2106  * that takes into account server limit and cl_max_rpcs_in_flight
2107  * value.
2108  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2109  * one close request is allowed above the maximum.
2110  */
2111 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2112                                                  bool close_req)
2113 {
2114         bool avail;
2115
2116         /* A slot is available if
2117          * - number of modify RPCs in flight is less than the max
2118          * - it's a close RPC and no other close request is in flight
2119          */
2120         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2121                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2122
2123         return avail;
2124 }
2125
2126 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2127                                          bool close_req)
2128 {
2129         bool avail;
2130
2131         spin_lock(&cli->cl_mod_rpcs_lock);
2132         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2133         spin_unlock(&cli->cl_mod_rpcs_lock);
2134         return avail;
2135 }
2136
2137 /* Get a modify RPC slot from the obd client @cli according
2138  * to the kind of operation @opc that is going to be sent
2139  * and the intent @it of the operation if it applies.
2140  * If the maximum number of modify RPCs in flight is reached
2141  * the thread is put to sleep.
2142  * Returns the tag to be set in the request message. Tag 0
2143  * is reserved for non-modifying requests.
2144  */
2145 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2146                            struct lookup_intent *it)
2147 {
2148         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2149         bool                    close_req = false;
2150         __u16                   i, max;
2151
2152         /* read-only metadata RPCs don't consume a slot on MDT
2153          * for reply reconstruction
2154          */
2155         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2156                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2157                 return 0;
2158
2159         if (opc == MDS_CLOSE)
2160                 close_req = true;
2161
2162         do {
2163                 spin_lock(&cli->cl_mod_rpcs_lock);
2164                 max = cli->cl_max_mod_rpcs_in_flight;
2165                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2166                         /* there is a slot available */
2167                         cli->cl_mod_rpcs_in_flight++;
2168                         if (close_req)
2169                                 cli->cl_close_rpcs_in_flight++;
2170                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2171                                          cli->cl_mod_rpcs_in_flight);
2172                         /* find a free tag */
2173                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2174                                                 max + 1);
2175                         LASSERT(i < OBD_MAX_RIF_MAX);
2176                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2177                         spin_unlock(&cli->cl_mod_rpcs_lock);
2178                         /* tag 0 is reserved for non-modify RPCs */
2179                         return i + 1;
2180                 }
2181                 spin_unlock(&cli->cl_mod_rpcs_lock);
2182
2183                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2184                        "opc %u, max %hu\n",
2185                        cli->cl_import->imp_obd->obd_name, opc, max);
2186
2187                 l_wait_event(cli->cl_mod_rpcs_waitq,
2188                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2189         } while (true);
2190 }
2191 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2192
2193 /* Put a modify RPC slot from the obd client @cli according
2194  * to the kind of operation @opc that has been sent and the
2195  * intent @it of the operation if it applies.
2196  */
2197 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2198                           struct lookup_intent *it, __u16 tag)
2199 {
2200         bool                    close_req = false;
2201
2202         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2203                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2204                 return;
2205
2206         if (opc == MDS_CLOSE)
2207                 close_req = true;
2208
2209         spin_lock(&cli->cl_mod_rpcs_lock);
2210         cli->cl_mod_rpcs_in_flight--;
2211         if (close_req)
2212                 cli->cl_close_rpcs_in_flight--;
2213         /* release the tag in the bitmap */
2214         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2215         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2216         spin_unlock(&cli->cl_mod_rpcs_lock);
2217         wake_up(&cli->cl_mod_rpcs_waitq);
2218 }
2219 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2220