Whamcloud - gitweb
LU-17269 obdclass: fix locking for class_register/deregister
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_XARRAY_ALLOC(obd_devs);
49 EXPORT_SYMBOL(obd_devs);
50
51 static atomic_t obd_devs_count = ATOMIC_INIT(0);
52
53 static struct kmem_cache *obd_device_cachep;
54 static struct kobj_type class_ktype;
55 static struct workqueue_struct *zombie_wq;
56
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks, int debug_level);
61
62 static LIST_HEAD(obd_stale_exports);
63 static DEFINE_SPINLOCK(obd_stale_export_lock);
64 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65
66 static struct obd_device *obd_device_alloc(void)
67 {
68         struct obd_device *obd;
69
70         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
71         if (obd != NULL)
72                 obd->obd_magic = OBD_DEVICE_MAGIC;
73         return obd;
74 }
75
76 static void obd_device_free(struct obd_device *obd)
77 {
78         LASSERT(obd != NULL);
79         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
80                  "obd %px obd_magic %08x != %08x\n",
81                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
82         if (obd->obd_namespace != NULL) {
83                 CERROR("obd %px: namespace %px was not properly cleaned up (obd_force=%d)!\n",
84                        obd, obd->obd_namespace, obd->obd_force);
85                 LBUG();
86         }
87         lu_ref_fini(&obd->obd_reference);
88         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
89 }
90
91 struct obd_type *class_search_type(const char *name)
92 {
93         struct kobject *kobj = kset_find_obj(lustre_kset, name);
94
95         if (kobj && kobj->ktype == &class_ktype)
96                 return container_of(kobj, struct obd_type, typ_kobj);
97
98         kobject_put(kobj);
99         return NULL;
100 }
101 EXPORT_SYMBOL(class_search_type);
102
103 struct obd_type *class_get_type(const char *name)
104 {
105         struct obd_type *type;
106
107         rcu_read_lock();
108         type = class_search_type(name);
109 #ifdef HAVE_MODULE_LOADING_SUPPORT
110         if (!type) {
111                 const char *modname = name;
112
113 #ifdef HAVE_SERVER_SUPPORT
114                 if (strcmp(modname, "obdfilter") == 0)
115                         modname = "ofd";
116
117                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
118                         modname = LUSTRE_OSP_NAME;
119
120                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
121                         modname = LUSTRE_MDT_NAME;
122 #endif /* HAVE_SERVER_SUPPORT */
123
124                 rcu_read_unlock();
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131                 rcu_read_lock();
132                 type = class_search_type(name);
133         }
134 #endif
135         if (type) {
136                 /*
137                  * Holding rcu_read_lock() matches the synchronize_rcu() call
138                  * in free_module() and ensures that if type->typ_dt_ops is
139                  * not yet NULL, then the module won't be freed until after
140                  * we rcu_read_unlock().
141                  */
142                 const struct obd_ops *dt_ops = READ_ONCE(type->typ_dt_ops);
143
144                 if (dt_ops && try_module_get(dt_ops->o_owner)) {
145                         atomic_inc(&type->typ_refcnt);
146                         /* class_search_type() returned a counted ref, this
147                          * count not needed as we could get it via typ_refcnt
148                          */
149                         kobject_put(&type->typ_kobj);
150                 } else {
151                         kobject_put(&type->typ_kobj);
152                         type = NULL;
153                 }
154         }
155         rcu_read_unlock();
156         return type;
157 }
158 EXPORT_SYMBOL(class_get_type);
159
160 void class_put_type(struct obd_type *type)
161 {
162         LASSERT(type);
163         module_put(type->typ_dt_ops->o_owner);
164         atomic_dec(&type->typ_refcnt);
165 }
166 EXPORT_SYMBOL(class_put_type);
167
168 static void class_sysfs_release(struct kobject *kobj)
169 {
170         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
171
172         debugfs_remove_recursive(type->typ_debugfs_entry);
173         type->typ_debugfs_entry = NULL;
174
175         if (type->typ_lu)
176                 lu_device_type_fini(type->typ_lu);
177
178 #ifdef CONFIG_PROC_FS
179         if (type->typ_name && type->typ_procroot)
180                 remove_proc_subtree(type->typ_name, proc_lustre_root);
181 #endif
182         OBD_FREE(type, sizeof(*type));
183 }
184
185 static struct kobj_type class_ktype = {
186         .sysfs_ops      = &lustre_sysfs_ops,
187         .release        = class_sysfs_release,
188 };
189
190 #ifdef HAVE_SERVER_SUPPORT
191 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
192 {
193         struct dentry *symlink;
194         struct obd_type *type;
195         int rc;
196
197         type = class_search_type(name);
198         if (type) {
199                 kobject_put(&type->typ_kobj);
200                 return ERR_PTR(-EEXIST);
201         }
202
203         OBD_ALLOC(type, sizeof(*type));
204         if (!type)
205                 return ERR_PTR(-ENOMEM);
206
207         type->typ_kobj.kset = lustre_kset;
208         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
209                                   &lustre_kset->kobj, "%s", name);
210         if (rc)
211                 return ERR_PTR(rc);
212
213         symlink = debugfs_create_dir(name, debugfs_lustre_root);
214         type->typ_debugfs_entry = symlink;
215         type->typ_sym_filter = true;
216
217         if (enable_proc) {
218                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
219                                                       NULL, NULL);
220                 if (IS_ERR(type->typ_procroot)) {
221                         CERROR("%s: can't create compat proc entry: %d\n",
222                                name, (int)PTR_ERR(type->typ_procroot));
223                         type->typ_procroot = NULL;
224                 }
225         }
226
227         return type;
228 }
229 EXPORT_SYMBOL(class_add_symlinks);
230 #endif /* HAVE_SERVER_SUPPORT */
231
232 #define CLASS_MAX_NAME 1024
233
234 int class_register_type(const struct obd_ops *dt_ops,
235                         const struct md_ops *md_ops,
236                         bool enable_proc,
237                         const char *name, struct lu_device_type *ldt)
238 {
239         struct obd_type *type;
240         int rc;
241
242         ENTRY;
243         /* sanity check */
244         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
245
246         type = class_search_type(name);
247         if (type) {
248 #ifdef HAVE_SERVER_SUPPORT
249                 if (type->typ_sym_filter)
250                         goto dir_exist;
251 #endif /* HAVE_SERVER_SUPPORT */
252                 kobject_put(&type->typ_kobj);
253                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
254                 RETURN(-EEXIST);
255         }
256
257         OBD_ALLOC(type, sizeof(*type));
258         if (type == NULL)
259                 RETURN(-ENOMEM);
260
261         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
262         type->typ_kobj.kset = lustre_kset;
263         kobject_init(&type->typ_kobj, &class_ktype);
264 #ifdef HAVE_SERVER_SUPPORT
265 dir_exist:
266 #endif /* HAVE_SERVER_SUPPORT */
267
268         type->typ_dt_ops = dt_ops;
269         type->typ_md_ops = md_ops;
270
271 #ifdef HAVE_SERVER_SUPPORT
272         if (type->typ_sym_filter) {
273                 type->typ_sym_filter = false;
274                 kobject_put(&type->typ_kobj);
275                 goto setup_ldt;
276         }
277 #endif
278 #ifdef CONFIG_PROC_FS
279         if (enable_proc && !type->typ_procroot) {
280                 type->typ_procroot = lprocfs_register(name,
281                                                       proc_lustre_root,
282                                                       NULL, type);
283                 if (IS_ERR(type->typ_procroot)) {
284                         rc = PTR_ERR(type->typ_procroot);
285                         type->typ_procroot = NULL;
286                         GOTO(failed, rc);
287                 }
288         }
289 #endif
290         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
291
292         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
293         if (rc)
294                 GOTO(failed, rc);
295 #ifdef HAVE_SERVER_SUPPORT
296 setup_ldt:
297 #endif
298         if (ldt) {
299                 rc = lu_device_type_init(ldt);
300                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
301                 wake_up_var(&type->typ_lu);
302                 if (rc)
303                         GOTO(failed, rc);
304         }
305
306         RETURN(0);
307
308 failed:
309         kobject_put(&type->typ_kobj);
310
311         RETURN(rc);
312 }
313 EXPORT_SYMBOL(class_register_type);
314
315 int class_unregister_type(const char *name)
316 {
317         struct obd_type *type = class_search_type(name);
318         int rc = 0;
319
320         ENTRY;
321         if (!type) {
322                 CERROR("unknown obd type\n");
323                 RETURN(-EINVAL);
324         }
325
326         /*
327          * Ensure that class_get_type doesn't try to get the module
328          * as it could be freed before the obd_type is released.
329          * synchronize_rcu() will be called before the module
330          * is freed.
331          */
332         type->typ_dt_ops = NULL;
333
334         if (atomic_read(&type->typ_refcnt)) {
335                 CERROR("type %s has refcount (%d)\n", name,
336                        atomic_read(&type->typ_refcnt));
337                 /* This is a bad situation, let's make the best of it */
338                 /* Remove ops, but leave the name for debugging */
339                 type->typ_md_ops = NULL;
340                 GOTO(out_put, rc = -EBUSY);
341         }
342
343         /* Put the final ref */
344         kobject_put(&type->typ_kobj);
345 out_put:
346         /* Put the ref returned by class_search_type() */
347         kobject_put(&type->typ_kobj);
348
349         RETURN(rc);
350 } /* class_unregister_type */
351 EXPORT_SYMBOL(class_unregister_type);
352
353 /**
354  * Create a new obd device.
355  *
356  * Allocate the new obd_device and initialize it.
357  *
358  * \param[in] type_name obd device type string.
359  * \param[in] name      obd device name.
360  * \param[in] uuid      obd device UUID
361  *
362  * \retval newdev         pointer to created obd_device
363  * \retval ERR_PTR(errno) on error
364  */
365 struct obd_device *class_newdev(const char *type_name, const char *name,
366                                 const char *uuid)
367 {
368         struct obd_device *newdev;
369         struct obd_type *type = NULL;
370
371         ENTRY;
372
373         if (strlen(name) >= MAX_OBD_NAME) {
374                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
375                 RETURN(ERR_PTR(-EINVAL));
376         }
377
378         type = class_get_type(type_name);
379         if (type == NULL) {
380                 CERROR("OBD: unknown type: %s\n", type_name);
381                 RETURN(ERR_PTR(-ENODEV));
382         }
383
384         newdev = obd_device_alloc();
385         if (newdev == NULL) {
386                 class_put_type(type);
387                 RETURN(ERR_PTR(-ENOMEM));
388         }
389         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
390         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
391         newdev->obd_type = type;
392         newdev->obd_minor = -1;
393
394         rwlock_init(&newdev->obd_pool_lock);
395         newdev->obd_pool_limit = 0;
396         newdev->obd_pool_slv = 0;
397
398         INIT_LIST_HEAD(&newdev->obd_exports);
399         newdev->obd_num_exports = 0;
400         newdev->obd_grant_check_threshold = 100;
401         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
402         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
403         INIT_LIST_HEAD(&newdev->obd_exports_timed);
404         INIT_LIST_HEAD(&newdev->obd_nid_stats);
405         spin_lock_init(&newdev->obd_nid_lock);
406         spin_lock_init(&newdev->obd_dev_lock);
407         mutex_init(&newdev->obd_dev_mutex);
408         spin_lock_init(&newdev->obd_osfs_lock);
409         /* newdev->obd_osfs_age must be set to a value in the distant
410          * past to guarantee a fresh statfs is fetched on mount.
411          */
412         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
413
414         /* XXX belongs in setup not attach  */
415         init_rwsem(&newdev->obd_observer_link_sem);
416         /* recovery data */
417         spin_lock_init(&newdev->obd_recovery_task_lock);
418         init_waitqueue_head(&newdev->obd_next_transno_waitq);
419         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
420         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
421         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
422         INIT_LIST_HEAD(&newdev->obd_evict_list);
423         INIT_LIST_HEAD(&newdev->obd_lwp_list);
424
425         llog_group_init(&newdev->obd_olg);
426         /* Detach drops this */
427         kref_init(&newdev->obd_refcount);
428         lu_ref_init(&newdev->obd_reference);
429         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
430
431         atomic_set(&newdev->obd_conn_inprogress, 0);
432
433         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
434
435         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
436                newdev->obd_name, newdev);
437
438         return newdev;
439 }
440
441 /**
442  * Free obd device.
443  *
444  * \param[in] obd obd_device to be freed
445  *
446  * \retval none
447  */
448 void class_free_dev(struct obd_device *obd)
449 {
450         struct obd_type *obd_type = obd->obd_type;
451
452         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
453                  "%px obd_magic %08x != %08x\n",
454                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
455         LASSERTF(obd->obd_minor == -1 || class_num2obd(obd->obd_minor) == obd,
456                  "obd %px != obd_devs[%d] %px\n",
457                  obd, obd->obd_minor, class_num2obd(obd->obd_minor));
458         LASSERTF(kref_read(&obd->obd_refcount) == 0,
459                  "obd_refcount should be 0, not %d\n",
460                  kref_read(&obd->obd_refcount));
461         LASSERT(obd_type != NULL);
462
463         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
464                obd->obd_name, obd->obd_type->typ_name);
465
466         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
467                          obd->obd_name, obd->obd_uuid.uuid);
468         if (obd->obd_stopping) {
469                 int err;
470
471                 /* If we're not stopping, we were never set up */
472                 err = obd_cleanup(obd);
473                 if (err)
474                         CERROR("Cleanup %s returned %d\n",
475                                 obd->obd_name, err);
476         }
477
478         obd_device_free(obd);
479
480         class_put_type(obd_type);
481 }
482
483 static int class_name2dev_nolock(const char *name)
484 {
485         struct obd_device *obd = NULL;
486         unsigned long dev_no = 0;
487         int ret;
488
489         if (!name)
490                 return -1;
491
492         obd_device_for_each(dev_no, obd) {
493                 if (strcmp(name, obd->obd_name) == 0) {
494                         /*
495                          * Make sure we finished attaching before we give
496                          * out any references
497                          */
498                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
499                         if (obd->obd_attached) {
500                                 ret = obd->obd_minor;
501                                 return ret;
502                         }
503                         break;
504                 }
505         }
506
507         return -1;
508 }
509
510 int class_name2dev(const char *name)
511 {
512         int ret;
513
514         obd_device_lock();
515         ret = class_name2dev_nolock(name);
516         obd_device_unlock();
517
518         return ret;
519 }
520 EXPORT_SYMBOL(class_name2dev);
521
522 /**
523  * Unregister obd device.
524  *
525  * Remove an obd from obd_dev
526  *
527  * \param[in] new_obd obd_device to be unregistered
528  *
529  * \retval none
530  */
531 void class_unregister_device(struct obd_device *obd)
532 {
533         obd_device_lock();
534         if (obd->obd_minor >= 0) {
535                 __xa_erase(&obd_devs, obd->obd_minor);
536                 class_decref(obd, "obd_device_list", obd);
537                 obd->obd_minor = -1;
538                 atomic_dec(&obd_devs_count);
539         }
540         obd_device_unlock();
541 }
542
543 /**
544  * Register obd device.
545  *
546  * Add new_obd to obd_devs
547  *
548  * \param[in] new_obd obd_device to be registered
549  *
550  * \retval 0          success
551  * \retval -EEXIST    device with this name is registered
552  */
553 int class_register_device(struct obd_device *new_obd)
554 {
555         int rc = 0;
556         int dev_no = 0;
557
558         if (new_obd == NULL) {
559                 rc = -1;
560                 goto out;
561         }
562
563         /* obd_device waiting to be destroyed by "obd_zombie_impexp_thread" */
564         if (class_name2dev(new_obd->obd_name) != -1)
565                 obd_zombie_barrier();
566
567         obd_device_lock();
568         if (class_name2dev_nolock(new_obd->obd_name) == -1) {
569                 class_incref(new_obd, "obd_device_list", new_obd);
570                 rc = __xa_alloc(&obd_devs, &dev_no, new_obd,
571                                 xa_limit_31b, GFP_ATOMIC);
572
573                 if (rc != 0)
574                         goto out;
575
576                 new_obd->obd_minor = dev_no;
577                 atomic_inc(&obd_devs_count);
578         } else {
579                 rc = -EEXIST;
580         }
581
582 out:
583         obd_device_unlock();
584         RETURN(rc);
585 }
586
587 struct obd_device *class_name2obd(const char *name)
588 {
589         struct obd_device *obd = NULL;
590         unsigned long dev_no = 0;
591
592         if (!name)
593                 return NULL;
594
595         obd_device_lock();
596         obd_device_for_each(dev_no, obd) {
597                 if (strcmp(name, obd->obd_name) == 0) {
598                         /*
599                          * Make sure we finished attaching before we give
600                          * out any references
601                          */
602                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
603                         if (obd->obd_attached)
604                                 break;
605                 }
606         }
607         obd_device_unlock();
608
609         /*
610          * TODO: We give out a reference without class_incref(). This isn't
611          * ideal, but this behavior is identical in previous implementations
612          * of this function.
613          */
614         return obd;
615 }
616 EXPORT_SYMBOL(class_name2obd);
617
618 int class_uuid2dev(struct obd_uuid *uuid)
619 {
620         struct obd_device *obd = NULL;
621         unsigned long dev_no = 0;
622         int ret;
623
624         obd_device_lock();
625         obd_device_for_each(dev_no, obd) {
626                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
627                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
628                         ret = obd->obd_minor;
629                         obd_device_unlock();
630                         return ret;
631                 }
632         }
633         obd_device_unlock();
634
635         return -1;
636 }
637 EXPORT_SYMBOL(class_uuid2dev);
638
639 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
640 {
641         struct obd_device *obd = NULL;
642         unsigned long dev_no = 0;
643
644         obd_device_lock();
645         obd_device_for_each(dev_no, obd) {
646                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
647                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
648                         break;
649                 }
650         }
651         obd_device_unlock();
652
653         /*
654          * TODO: We give out a reference without class_incref(). This isn't
655          * ideal, but this behavior is identical in previous implementations
656          * of this function.
657          */
658         return obd;
659 }
660 EXPORT_SYMBOL(class_uuid2obd);
661
662 struct obd_device *class_num2obd(int dev_no)
663 {
664         return xa_load(&obd_devs, dev_no);
665 }
666 EXPORT_SYMBOL(class_num2obd);
667
668 /**
669  * Find obd by name or uuid.
670  *
671  * Increment obd's refcount if found.
672  *
673  * \param[in] str obd name or uuid
674  *
675  * \retval NULL    if not found
676  * \retval obd     pointer to found obd_device
677  */
678 struct obd_device *class_str2obd(const char *str)
679 {
680         struct obd_device *obd = NULL;
681         struct obd_uuid uuid;
682         unsigned long dev_no = 0;
683
684         obd_str2uuid(&uuid, str);
685
686         obd_device_lock();
687         obd_device_for_each(dev_no, obd) {
688                 if (obd_uuid_equals(&uuid, &obd->obd_uuid) ||
689                     (strcmp(str, obd->obd_name) == 0)) {
690                         /*
691                          * Make sure we finished attaching before we give
692                          * out any references
693                          */
694                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
695                         if (obd->obd_attached) {
696                                 class_incref(obd, "find", current);
697                                 break;
698                         }
699                         obd_device_unlock();
700                         RETURN(NULL);
701                 }
702         }
703         obd_device_unlock();
704
705         RETURN(obd);
706 }
707 EXPORT_SYMBOL(class_str2obd);
708
709 /**
710  * Get obd devices count. Device in any
711  *    state are counted
712  * \retval obd device count
713  */
714 int class_obd_devs_count(void)
715 {
716         return atomic_read(&obd_devs_count);
717 }
718 EXPORT_SYMBOL(class_obd_devs_count);
719
720 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
721  * specified, then only the client with that uuid is returned,
722  * otherwise any client connected to the tgt is returned.
723  */
724 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
725                                          const char *type_name,
726                                          struct obd_uuid *grp_uuid)
727 {
728         struct obd_device *obd = NULL;
729         unsigned long dev_no = 0;
730
731         obd_device_lock();
732         obd_device_for_each(dev_no, obd) {
733                 if ((strncmp(obd->obd_type->typ_name, type_name,
734                              strlen(type_name)) == 0)) {
735                         if (obd_uuid_equals(tgt_uuid,
736                                             &obd->u.cli.cl_target_uuid) &&
737                             ((grp_uuid) ? obd_uuid_equals(grp_uuid,
738                                                          &obd->obd_uuid) : 1)) {
739                                 obd_device_unlock();
740                                 return obd;
741                         }
742                 }
743         }
744         obd_device_unlock();
745
746         return NULL;
747 }
748 EXPORT_SYMBOL(class_find_client_obd);
749
750 /**
751  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
752  * adjust sptlrpc settings accordingly.
753  */
754 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
755 {
756         struct obd_device *obd = NULL;
757         unsigned long dev_no = 0;
758         const char *type;
759         int rc = 0, rc2;
760
761         LASSERT(namelen > 0);
762
763         obd_device_lock();
764         obd_device_for_each(dev_no, obd) {
765                 if (obd->obd_set_up == 0 || obd->obd_stopping)
766                         continue;
767
768                 /* only notify mdc, osc, osp, lwp, mdt, ost
769                  * because only these have a -sptlrpc llog
770                  */
771                 type = obd->obd_type->typ_name;
772                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
773                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
774                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
775                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
776                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
777                     strcmp(type, LUSTRE_OST_NAME) != 0)
778                         continue;
779
780                 if (strncmp(obd->obd_name, fsname, namelen))
781                         continue;
782
783                 class_incref(obd, __func__, obd);
784                 obd_device_unlock();
785                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
786                                          sizeof(KEY_SPTLRPC_CONF),
787                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
788                 rc = rc ? rc : rc2;
789                 obd_device_lock();
790                 class_decref(obd, __func__, obd);
791         }
792         obd_device_unlock();
793
794         return rc;
795 }
796 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
797
798 void obd_cleanup_caches(void)
799 {
800         ENTRY;
801         if (obd_device_cachep) {
802                 kmem_cache_destroy(obd_device_cachep);
803                 obd_device_cachep = NULL;
804         }
805
806         EXIT;
807 }
808
809 int obd_init_caches(void)
810 {
811         int rc;
812
813         ENTRY;
814
815         LASSERT(obd_device_cachep == NULL);
816         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
817                                 sizeof(struct obd_device),
818                                 0, 0, 0, sizeof(struct obd_device), NULL);
819         if (!obd_device_cachep)
820                 GOTO(out, rc = -ENOMEM);
821
822         RETURN(0);
823 out:
824         obd_cleanup_caches();
825         RETURN(rc);
826 }
827
828 static const char export_handle_owner[] = "export";
829
830 /* map connection to client */
831 struct obd_export *class_conn2export(struct lustre_handle *conn)
832 {
833         struct obd_export *export;
834
835         ENTRY;
836
837         if (!conn) {
838                 CDEBUG(D_CACHE, "looking for null handle\n");
839                 RETURN(NULL);
840         }
841
842         if (conn->cookie == -1) {  /* this means assign a new connection */
843                 CDEBUG(D_CACHE, "want a new connection\n");
844                 RETURN(NULL);
845         }
846
847         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
848         export = class_handle2object(conn->cookie, export_handle_owner);
849         RETURN(export);
850 }
851 EXPORT_SYMBOL(class_conn2export);
852
853 struct obd_device *class_exp2obd(struct obd_export *exp)
854 {
855         if (exp)
856                 return exp->exp_obd;
857         return NULL;
858 }
859 EXPORT_SYMBOL(class_exp2obd);
860
861 struct obd_import *class_exp2cliimp(struct obd_export *exp)
862 {
863         struct obd_device *obd = exp->exp_obd;
864
865         if (obd == NULL)
866                 return NULL;
867         return obd->u.cli.cl_import;
868 }
869 EXPORT_SYMBOL(class_exp2cliimp);
870
871 /* Export management functions */
872 static void class_export_destroy(struct obd_export *exp)
873 {
874         struct obd_device *obd = exp->exp_obd;
875
876         ENTRY;
877
878         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
879         LASSERT(obd != NULL);
880
881         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
882                exp->exp_client_uuid.uuid, obd->obd_name);
883
884         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
885         ptlrpc_connection_put(exp->exp_connection);
886
887         LASSERT(list_empty(&exp->exp_outstanding_replies));
888         LASSERT(list_empty(&exp->exp_uncommitted_replies));
889         LASSERT(list_empty(&exp->exp_req_replay_queue));
890         LASSERT(list_empty(&exp->exp_hp_rpcs));
891         obd_destroy_export(exp);
892         /* self export doesn't hold a reference to an obd, although it
893          * exists until freeing of the obd
894          */
895         if (exp != obd->obd_self_export)
896                 class_decref(obd, "export", exp);
897
898         OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
899         kfree_rcu(exp, exp_handle.h_rcu);
900         EXIT;
901 }
902
903 struct obd_export *class_export_get(struct obd_export *exp)
904 {
905         refcount_inc(&exp->exp_handle.h_ref);
906         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
907                refcount_read(&exp->exp_handle.h_ref));
908         return exp;
909 }
910 EXPORT_SYMBOL(class_export_get);
911
912 void class_export_put(struct obd_export *exp)
913 {
914         LASSERT(exp != NULL);
915         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
916         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
917         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
918                refcount_read(&exp->exp_handle.h_ref) - 1);
919
920         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
921                 struct obd_device *obd = exp->exp_obd;
922
923                 CDEBUG(D_IOCTL, "final put %p/%s\n",
924                        exp, exp->exp_client_uuid.uuid);
925
926                 /* release nid stat refererence */
927                 lprocfs_exp_cleanup(exp);
928
929                 if (exp == obd->obd_self_export) {
930                         /* self export should be destroyed without zombie
931                          * thread as it doesn't hold a reference to obd and
932                          * doesn't hold any resources
933                          */
934                         class_export_destroy(exp);
935                         /* self export is destroyed, no class ref exist and it
936                          * is safe to free obd
937                          */
938                         class_free_dev(obd);
939                 } else {
940                         LASSERT(!list_empty(&exp->exp_obd_chain));
941                         obd_zombie_export_add(exp);
942                 }
943
944         }
945 }
946 EXPORT_SYMBOL(class_export_put);
947
948 static void obd_zombie_exp_cull(struct work_struct *ws)
949 {
950         struct obd_export *export;
951
952         export = container_of(ws, struct obd_export, exp_zombie_work);
953         class_export_destroy(export);
954         LASSERT(atomic_read(&obd_stale_export_num) > 0);
955         if (atomic_dec_and_test(&obd_stale_export_num))
956                 wake_up_var(&obd_stale_export_num);
957 }
958
959 /* Creates a new export, adds it to the hash table, and returns a
960  * pointer to it. The refcount is 2: one for the hash reference, and
961  * one for the pointer returned by this function.
962  */
963 static struct obd_export *__class_new_export(struct obd_device *obd,
964                                              struct obd_uuid *cluuid,
965                                              bool is_self)
966 {
967         struct obd_export *export;
968         int rc = 0;
969
970         ENTRY;
971
972         OBD_ALLOC_PTR(export);
973         if (!export)
974                 return ERR_PTR(-ENOMEM);
975
976         export->exp_conn_cnt = 0;
977         export->exp_lock_hash = NULL;
978         export->exp_flock_hash = NULL;
979         /* 2 = class_handle_hash + last */
980         refcount_set(&export->exp_handle.h_ref, 2);
981         atomic_set(&export->exp_rpc_count, 0);
982         atomic_set(&export->exp_cb_count, 0);
983         atomic_set(&export->exp_locks_count, 0);
984 #if LUSTRE_TRACKS_LOCK_EXP_REFS
985         INIT_LIST_HEAD(&export->exp_locks_list);
986         spin_lock_init(&export->exp_locks_list_guard);
987 #endif
988         atomic_set(&export->exp_replay_count, 0);
989         export->exp_obd = obd;
990         INIT_LIST_HEAD(&export->exp_outstanding_replies);
991         spin_lock_init(&export->exp_uncommitted_replies_lock);
992         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
993         INIT_LIST_HEAD(&export->exp_req_replay_queue);
994         INIT_HLIST_NODE(&export->exp_handle.h_link);
995         INIT_LIST_HEAD(&export->exp_hp_rpcs);
996         INIT_LIST_HEAD(&export->exp_reg_rpcs);
997         class_handle_hash(&export->exp_handle, export_handle_owner);
998         export->exp_last_request_time = ktime_get_real_seconds();
999         spin_lock_init(&export->exp_lock);
1000         spin_lock_init(&export->exp_rpc_lock);
1001         INIT_HLIST_NODE(&export->exp_gen_hash);
1002         spin_lock_init(&export->exp_bl_list_lock);
1003         INIT_LIST_HEAD(&export->exp_bl_list);
1004         INIT_LIST_HEAD(&export->exp_stale_list);
1005         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1006
1007         export->exp_sp_peer = LUSTRE_SP_ANY;
1008         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1009         export->exp_client_uuid = *cluuid;
1010         obd_init_export(export);
1011
1012         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1013         export->exp_root_fid.f_seq = 0;
1014         export->exp_root_fid.f_oid = 0;
1015         export->exp_root_fid.f_ver = 0;
1016
1017         spin_lock(&obd->obd_dev_lock);
1018         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1019                 /* shouldn't happen, but might race */
1020                 if (obd->obd_stopping)
1021                         GOTO(exit_unlock, rc = -ENODEV);
1022
1023                 rc = obd_uuid_add(obd, export);
1024                 if (rc != 0) {
1025                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1026                                       obd->obd_name, cluuid->uuid, rc);
1027                         GOTO(exit_unlock, rc = -EALREADY);
1028                 }
1029         }
1030
1031         if (!is_self) {
1032                 class_incref(obd, "export", export);
1033                 list_add_tail(&export->exp_obd_chain_timed,
1034                               &obd->obd_exports_timed);
1035                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1036                 obd->obd_num_exports++;
1037         } else {
1038                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1039                 INIT_LIST_HEAD(&export->exp_obd_chain);
1040         }
1041         spin_unlock(&obd->obd_dev_lock);
1042         RETURN(export);
1043
1044 exit_unlock:
1045         spin_unlock(&obd->obd_dev_lock);
1046         class_handle_unhash(&export->exp_handle);
1047         obd_destroy_export(export);
1048         OBD_FREE_PTR(export);
1049         return ERR_PTR(rc);
1050 }
1051
1052 struct obd_export *class_new_export(struct obd_device *obd,
1053                                     struct obd_uuid *uuid)
1054 {
1055         return __class_new_export(obd, uuid, false);
1056 }
1057 EXPORT_SYMBOL(class_new_export);
1058
1059 struct obd_export *class_new_export_self(struct obd_device *obd,
1060                                          struct obd_uuid *uuid)
1061 {
1062         return __class_new_export(obd, uuid, true);
1063 }
1064
1065 void class_unlink_export(struct obd_export *exp)
1066 {
1067         class_handle_unhash(&exp->exp_handle);
1068
1069         if (exp->exp_obd->obd_self_export == exp) {
1070                 class_export_put(exp);
1071                 return;
1072         }
1073
1074         spin_lock(&exp->exp_obd->obd_dev_lock);
1075         /* delete an uuid-export hashitem from hashtables */
1076         if (exp != exp->exp_obd->obd_self_export)
1077                 obd_uuid_del(exp->exp_obd, exp);
1078
1079 #ifdef HAVE_SERVER_SUPPORT
1080         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1081                 struct tg_export_data   *ted = &exp->exp_target_data;
1082                 struct cfs_hash         *hash;
1083
1084                 /* Because obd_gen_hash will not be released until
1085                  * class_cleanup(), so hash should never be NULL here
1086                  */
1087                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1088                 LASSERT(hash != NULL);
1089                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1090                              &exp->exp_gen_hash);
1091                 cfs_hash_putref(hash);
1092         }
1093 #endif /* HAVE_SERVER_SUPPORT */
1094
1095         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1096         list_del_init(&exp->exp_obd_chain_timed);
1097         exp->exp_obd->obd_num_exports--;
1098         spin_unlock(&exp->exp_obd->obd_dev_lock);
1099
1100         /* A reference is kept by obd_stale_exports list */
1101         obd_stale_export_put(exp);
1102 }
1103 EXPORT_SYMBOL(class_unlink_export);
1104
1105 /* Import management functions */
1106 static void obd_zombie_import_free(struct obd_import *imp)
1107 {
1108         struct obd_import_conn *imp_conn;
1109
1110         ENTRY;
1111         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1112                imp->imp_obd->obd_name);
1113
1114         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1115
1116         ptlrpc_connection_put(imp->imp_connection);
1117
1118         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1119                                                     struct obd_import_conn,
1120                                                     oic_item)) != NULL) {
1121                 list_del_init(&imp_conn->oic_item);
1122                 ptlrpc_connection_put(imp_conn->oic_conn);
1123                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1124         }
1125
1126         LASSERT(imp->imp_sec == NULL);
1127         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1128                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1129         class_decref(imp->imp_obd, "import", imp);
1130         OBD_FREE_PTR(imp);
1131         EXIT;
1132 }
1133
1134 struct obd_import *class_import_get(struct obd_import *import)
1135 {
1136         refcount_inc(&import->imp_refcount);
1137         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1138                refcount_read(&import->imp_refcount),
1139                import->imp_obd->obd_name);
1140         return import;
1141 }
1142 EXPORT_SYMBOL(class_import_get);
1143
1144 void class_import_put(struct obd_import *imp)
1145 {
1146         ENTRY;
1147
1148         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1149
1150         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1151                refcount_read(&imp->imp_refcount) - 1,
1152                imp->imp_obd->obd_name);
1153
1154         if (refcount_dec_and_test(&imp->imp_refcount)) {
1155                 CDEBUG(D_INFO, "final put import %p\n", imp);
1156                 obd_zombie_import_add(imp);
1157         }
1158
1159         EXIT;
1160 }
1161 EXPORT_SYMBOL(class_import_put);
1162
1163 static void init_imp_at(struct imp_at *at)
1164 {
1165         int i;
1166
1167         at_init(&at->iat_net_latency, 0, 0);
1168         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1169                 /* max service estimates are tracked server side, so dont't
1170                  * use AT history here, just use the last reported val. (But
1171                  * keep hist for proc histogram, worst_ever)
1172                  */
1173                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1174                         AT_FLG_NOHIST);
1175         }
1176 }
1177
1178 static void obd_zombie_imp_cull(struct work_struct *ws)
1179 {
1180         struct obd_import *import;
1181
1182         import = container_of(ws, struct obd_import, imp_zombie_work);
1183         obd_zombie_import_free(import);
1184 }
1185
1186 struct obd_import *class_new_import(struct obd_device *obd)
1187 {
1188         struct obd_import *imp;
1189         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1190
1191         OBD_ALLOC(imp, sizeof(*imp));
1192         if (imp == NULL)
1193                 return NULL;
1194
1195         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1196         INIT_LIST_HEAD(&imp->imp_replay_list);
1197         INIT_LIST_HEAD(&imp->imp_sending_list);
1198         INIT_LIST_HEAD(&imp->imp_delayed_list);
1199         INIT_LIST_HEAD(&imp->imp_committed_list);
1200         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1201         imp->imp_known_replied_xid = 0;
1202         imp->imp_replay_cursor = &imp->imp_committed_list;
1203         spin_lock_init(&imp->imp_lock);
1204         imp->imp_last_success_conn = 0;
1205         imp->imp_state = LUSTRE_IMP_NEW;
1206         imp->imp_obd = class_incref(obd, "import", imp);
1207         rwlock_init(&imp->imp_sec_lock);
1208         init_waitqueue_head(&imp->imp_recovery_waitq);
1209         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1210
1211         if (curr_pid_ns && curr_pid_ns->child_reaper)
1212                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1213         else
1214                 imp->imp_sec_refpid = 1;
1215
1216         refcount_set(&imp->imp_refcount, 2);
1217         atomic_set(&imp->imp_unregistering, 0);
1218         atomic_set(&imp->imp_reqs, 0);
1219         atomic_set(&imp->imp_inflight, 0);
1220         atomic_set(&imp->imp_replay_inflight, 0);
1221         init_waitqueue_head(&imp->imp_replay_waitq);
1222         atomic_set(&imp->imp_inval_count, 0);
1223         atomic_set(&imp->imp_waiting, 0);
1224         INIT_LIST_HEAD(&imp->imp_conn_list);
1225         init_imp_at(&imp->imp_at);
1226
1227         /* the default magic is V2, will be used in connect RPC, and
1228          * then adjusted according to the flags in request/reply.
1229          */
1230         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1231
1232         return imp;
1233 }
1234 EXPORT_SYMBOL(class_new_import);
1235
1236 void class_destroy_import(struct obd_import *import)
1237 {
1238         LASSERT(import != NULL);
1239         LASSERT(import != LP_POISON);
1240
1241         spin_lock(&import->imp_lock);
1242         import->imp_generation++;
1243         spin_unlock(&import->imp_lock);
1244         class_import_put(import);
1245 }
1246 EXPORT_SYMBOL(class_destroy_import);
1247
1248 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1249
1250 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1251 {
1252         spin_lock(&exp->exp_locks_list_guard);
1253
1254         LASSERT(lock->l_exp_refs_nr >= 0);
1255
1256         if (lock->l_exp_refs_target != NULL &&
1257             lock->l_exp_refs_target != exp) {
1258                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1259                               exp, lock, lock->l_exp_refs_target);
1260         }
1261         if ((lock->l_exp_refs_nr++) == 0) {
1262                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1263                 lock->l_exp_refs_target = exp;
1264         }
1265         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1266                lock, exp, lock->l_exp_refs_nr);
1267         spin_unlock(&exp->exp_locks_list_guard);
1268 }
1269 EXPORT_SYMBOL(__class_export_add_lock_ref);
1270
1271 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1272 {
1273         spin_lock(&exp->exp_locks_list_guard);
1274         LASSERT(lock->l_exp_refs_nr > 0);
1275         if (lock->l_exp_refs_target != exp) {
1276                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1277                               lock, lock->l_exp_refs_target, exp);
1278         }
1279         if (-- lock->l_exp_refs_nr == 0) {
1280                 list_del_init(&lock->l_exp_refs_link);
1281                 lock->l_exp_refs_target = NULL;
1282         }
1283         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1284                lock, exp, lock->l_exp_refs_nr);
1285         spin_unlock(&exp->exp_locks_list_guard);
1286 }
1287 EXPORT_SYMBOL(__class_export_del_lock_ref);
1288 #endif
1289
1290 /* A connection defines an export context in which preallocation can be
1291  * managed. This releases the export pointer reference, and returns the export
1292  * handle, so the export refcount is 1 when this function returns.
1293  */
1294 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1295                   struct obd_uuid *cluuid)
1296 {
1297         struct obd_export *export;
1298
1299         LASSERT(conn != NULL);
1300         LASSERT(obd != NULL);
1301         LASSERT(cluuid != NULL);
1302         ENTRY;
1303
1304         export = class_new_export(obd, cluuid);
1305         if (IS_ERR(export))
1306                 RETURN(PTR_ERR(export));
1307
1308         conn->cookie = export->exp_handle.h_cookie;
1309         class_export_put(export);
1310
1311         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1312                cluuid->uuid, conn->cookie);
1313         RETURN(0);
1314 }
1315 EXPORT_SYMBOL(class_connect);
1316
1317 /* if export is involved in recovery then clean up related things */
1318 static void class_export_recovery_cleanup(struct obd_export *exp)
1319 {
1320         struct obd_device *obd = exp->exp_obd;
1321
1322         spin_lock(&obd->obd_recovery_task_lock);
1323         if (obd->obd_recovering) {
1324                 if (exp->exp_in_recovery) {
1325                         spin_lock(&exp->exp_lock);
1326                         exp->exp_in_recovery = 0;
1327                         spin_unlock(&exp->exp_lock);
1328                         LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1329                         atomic_dec(&obd->obd_connected_clients);
1330                 }
1331
1332                 /* if called during recovery then should update
1333                  * obd_stale_clients counter, lightweight exports is not counted
1334                  */
1335                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1336                         exp->exp_obd->obd_stale_clients++;
1337         }
1338         spin_unlock(&obd->obd_recovery_task_lock);
1339
1340         spin_lock(&exp->exp_lock);
1341         /** Cleanup req replay fields */
1342         if (exp->exp_req_replay_needed) {
1343                 exp->exp_req_replay_needed = 0;
1344
1345                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1346                 atomic_dec(&obd->obd_req_replay_clients);
1347         }
1348
1349         /** Cleanup lock replay data */
1350         if (exp->exp_lock_replay_needed) {
1351                 exp->exp_lock_replay_needed = 0;
1352
1353                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1354                 atomic_dec(&obd->obd_lock_replay_clients);
1355         }
1356         spin_unlock(&exp->exp_lock);
1357 }
1358
1359 /* This function removes 1-3 references from the export:
1360  * 1 - for export pointer passed
1361  * and if disconnect really need
1362  * 2 - removing from hash
1363  * 3 - in client_unlink_export
1364  * The export pointer passed to this function can destroyed
1365  */
1366 int class_disconnect(struct obd_export *export)
1367 {
1368         int already_disconnected;
1369
1370         ENTRY;
1371
1372         if (export == NULL) {
1373                 CWARN("attempting to free NULL export %p\n", export);
1374                 RETURN(-EINVAL);
1375         }
1376
1377         spin_lock(&export->exp_lock);
1378         already_disconnected = export->exp_disconnected;
1379         export->exp_disconnected = 1;
1380 #ifdef HAVE_SERVER_SUPPORT
1381         /*  We hold references of export for uuid hash and nid_hash and export
1382          *  link at least. So it is safe to call rh*table_remove_fast in there.
1383          */
1384         obd_nid_del(export->exp_obd, export);
1385 #endif /* HAVE_SERVER_SUPPORT */
1386         spin_unlock(&export->exp_lock);
1387
1388         /* class_cleanup(), abort_recovery(), and class_fail_export() all end up
1389          * here, and any of them race we shouldn't call extra class_export_puts
1390          */
1391         if (already_disconnected)
1392                 GOTO(no_disconn, already_disconnected);
1393
1394         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1395                export->exp_handle.h_cookie);
1396
1397         class_export_recovery_cleanup(export);
1398         class_unlink_export(export);
1399 no_disconn:
1400         class_export_put(export);
1401         RETURN(0);
1402 }
1403 EXPORT_SYMBOL(class_disconnect);
1404
1405 /* Return non-zero for a fully connected export */
1406 int class_connected_export(struct obd_export *exp)
1407 {
1408         int connected = 0;
1409
1410         if (exp) {
1411                 spin_lock(&exp->exp_lock);
1412                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1413                 spin_unlock(&exp->exp_lock);
1414         }
1415         return connected;
1416 }
1417 EXPORT_SYMBOL(class_connected_export);
1418
1419 static void class_disconnect_export_list(struct list_head *list,
1420                                          enum obd_option flags)
1421 {
1422         int rc;
1423         struct obd_export *exp;
1424
1425         ENTRY;
1426
1427         /* It's possible that an export may disconnect itself, but
1428          * nothing else will be added to this list.
1429          */
1430         while ((exp = list_first_entry_or_null(list, struct obd_export,
1431                                                exp_obd_chain)) != NULL) {
1432                 /* need for safe call CDEBUG after obd_disconnect */
1433                 class_export_get(exp);
1434
1435                 spin_lock(&exp->exp_lock);
1436                 exp->exp_flags = flags;
1437                 spin_unlock(&exp->exp_lock);
1438
1439                 if (obd_uuid_equals(&exp->exp_client_uuid,
1440                                     &exp->exp_obd->obd_uuid)) {
1441                         CDEBUG(D_HA,
1442                                "exp %p export uuid == obd uuid, don't discon\n",
1443                                exp);
1444                         /* Need to delete this now so we don't end up pointing
1445                          * to work_list later when this export is cleaned up.
1446                          */
1447                         list_del_init(&exp->exp_obd_chain);
1448                         class_export_put(exp);
1449                         continue;
1450                 }
1451
1452                 class_export_get(exp);
1453                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at %lld\n",
1454                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1455                        exp, exp->exp_last_request_time);
1456                 /* release one export reference anyway */
1457                 rc = obd_disconnect(exp);
1458
1459                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1460                        obd_export_nid2str(exp), exp, rc);
1461                 class_export_put(exp);
1462         }
1463         EXIT;
1464 }
1465
1466 void class_disconnect_exports(struct obd_device *obd)
1467 {
1468         LIST_HEAD(work_list);
1469
1470         ENTRY;
1471
1472         /* Move all of the exports from obd_exports to a work list, en masse. */
1473         spin_lock(&obd->obd_dev_lock);
1474         list_splice_init(&obd->obd_exports, &work_list);
1475         list_splice_init(&obd->obd_delayed_exports, &work_list);
1476         spin_unlock(&obd->obd_dev_lock);
1477
1478         if (!list_empty(&work_list)) {
1479                 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1480                        obd->obd_minor, obd);
1481                 class_disconnect_export_list(&work_list,
1482                                              exp_flags_from_obd(obd));
1483         } else
1484                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1485                        obd->obd_minor, obd);
1486         EXIT;
1487 }
1488 EXPORT_SYMBOL(class_disconnect_exports);
1489
1490 /* Remove exports that have not completed recovery.
1491  */
1492 void class_disconnect_stale_exports(struct obd_device *obd,
1493                                     int (*test_export)(struct obd_export *))
1494 {
1495         LIST_HEAD(work_list);
1496         struct obd_export *exp, *n;
1497         int evicted = 0;
1498
1499         ENTRY;
1500
1501         spin_lock(&obd->obd_dev_lock);
1502         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1503                                  exp_obd_chain) {
1504                 /* don't count self-export as client */
1505                 if (obd_uuid_equals(&exp->exp_client_uuid,
1506                                     &exp->exp_obd->obd_uuid))
1507                         continue;
1508
1509                 /* don't evict clients which have no slot in last_rcvd
1510                  * (e.g. lightweight connection)
1511                  */
1512                 if (exp->exp_target_data.ted_lr_idx == -1)
1513                         continue;
1514
1515                 spin_lock(&exp->exp_lock);
1516                 if (exp->exp_failed || test_export(exp)) {
1517                         spin_unlock(&exp->exp_lock);
1518                         continue;
1519                 }
1520                 exp->exp_failed = 1;
1521                 atomic_inc(&exp->exp_obd->obd_eviction_count);
1522                 spin_unlock(&exp->exp_lock);
1523
1524                 list_move(&exp->exp_obd_chain, &work_list);
1525                 evicted++;
1526                 CWARN("%s: disconnect stale client %s@%s\n",
1527                       obd->obd_name, exp->exp_client_uuid.uuid,
1528                       obd_export_nid2str(exp));
1529                 print_export_data(exp, "EVICTING", 0, D_HA);
1530         }
1531         spin_unlock(&obd->obd_dev_lock);
1532
1533         if (evicted)
1534                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1535                               obd->obd_name, evicted);
1536
1537         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1538                                                  OBD_OPT_ABORT_RECOV);
1539         EXIT;
1540 }
1541 EXPORT_SYMBOL(class_disconnect_stale_exports);
1542
1543 void class_fail_export(struct obd_export *exp)
1544 {
1545         int rc, already_failed;
1546
1547         spin_lock(&exp->exp_lock);
1548         already_failed = exp->exp_failed;
1549         exp->exp_failed = 1;
1550         spin_unlock(&exp->exp_lock);
1551
1552         if (already_failed) {
1553                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1554                        exp, exp->exp_client_uuid.uuid);
1555                 return;
1556         }
1557
1558         atomic_inc(&exp->exp_obd->obd_eviction_count);
1559
1560         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1561                exp, exp->exp_client_uuid.uuid);
1562
1563         if (obd_dump_on_timeout)
1564                 libcfs_debug_dumplog();
1565
1566         /* need for safe call CDEBUG after obd_disconnect */
1567         class_export_get(exp);
1568
1569         /* Callers into obd_disconnect are removing their own ref(eg request) in
1570          * addition to one from hash table. We don't have such a ref so make one
1571          */
1572         class_export_get(exp);
1573         rc = obd_disconnect(exp);
1574         if (rc)
1575                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1576         else
1577                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1578                        exp, exp->exp_client_uuid.uuid);
1579         class_export_put(exp);
1580 }
1581 EXPORT_SYMBOL(class_fail_export);
1582
1583 #ifdef HAVE_SERVER_SUPPORT
1584
1585 static int take_first(struct obd_export *exp, void *data)
1586 {
1587         struct obd_export **expp = data;
1588
1589         if (*expp)
1590                 /* already have one */
1591                 return 0;
1592         if (exp->exp_failed)
1593                 /* Don't want this one */
1594                 return 0;
1595         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1596                 /* Cannot get a ref on this one */
1597                 return 0;
1598         *expp = exp;
1599         return 1;
1600 }
1601
1602 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1603 {
1604         struct lnet_nid nid_key;
1605         struct obd_export *doomed_exp;
1606         int exports_evicted = 0;
1607
1608         libcfs_strnid(&nid_key, nid);
1609
1610         spin_lock(&obd->obd_dev_lock);
1611         /* umount already run. evict thread should stop leaving unmount thread
1612          * to take over
1613          */
1614         if (obd->obd_stopping) {
1615                 spin_unlock(&obd->obd_dev_lock);
1616                 return exports_evicted;
1617         }
1618         spin_unlock(&obd->obd_dev_lock);
1619
1620         doomed_exp = NULL;
1621         while (obd_nid_export_for_each(obd, &nid_key,
1622                                        take_first, &doomed_exp) > 0) {
1623
1624                 LASSERTF(doomed_exp != obd->obd_self_export,
1625                          "self-export is hashed by NID?\n");
1626
1627                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1628                               obd->obd_name,
1629                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1630                               obd_export_nid2str(doomed_exp));
1631
1632                 class_fail_export(doomed_exp);
1633                 class_export_put(doomed_exp);
1634                 exports_evicted++;
1635                 doomed_exp = NULL;
1636         }
1637
1638         if (!exports_evicted)
1639                 CDEBUG(D_HA,
1640                        "%s: can't disconnect NID '%s': no exports found\n",
1641                        obd->obd_name, nid);
1642         return exports_evicted;
1643 }
1644 EXPORT_SYMBOL(obd_export_evict_by_nid);
1645
1646 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1647 {
1648         struct obd_export *doomed_exp = NULL;
1649         struct obd_uuid doomed_uuid;
1650         int exports_evicted = 0;
1651
1652         spin_lock(&obd->obd_dev_lock);
1653         if (obd->obd_stopping) {
1654                 spin_unlock(&obd->obd_dev_lock);
1655                 return exports_evicted;
1656         }
1657         spin_unlock(&obd->obd_dev_lock);
1658
1659         obd_str2uuid(&doomed_uuid, uuid);
1660         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1661                 CERROR("%s: can't evict myself\n", obd->obd_name);
1662                 return exports_evicted;
1663         }
1664
1665         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1666         if (doomed_exp == NULL) {
1667                 CERROR("%s: can't disconnect %s: no exports found\n",
1668                        obd->obd_name, uuid);
1669         } else {
1670                 CWARN("%s: evicting %s at adminstrative request\n",
1671                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1672                 class_fail_export(doomed_exp);
1673                 class_export_put(doomed_exp);
1674                 obd_uuid_del(obd, doomed_exp);
1675                 exports_evicted++;
1676         }
1677
1678         return exports_evicted;
1679 }
1680 #endif /* HAVE_SERVER_SUPPORT */
1681
1682 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1683 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1684 EXPORT_SYMBOL(class_export_dump_hook);
1685 #endif
1686
1687 static void print_export_data(struct obd_export *exp, const char *status,
1688                               int locks, int debug_level)
1689 {
1690         struct ptlrpc_reply_state *rs;
1691         struct ptlrpc_reply_state *first_reply = NULL;
1692         int nreplies = 0;
1693
1694         spin_lock(&exp->exp_lock);
1695         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1696                             rs_exp_list) {
1697                 if (nreplies == 0)
1698                         first_reply = rs;
1699                 nreplies++;
1700         }
1701         spin_unlock(&exp->exp_lock);
1702
1703         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu stale:%d\n",
1704                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1705                obd_export_nid2str(exp),
1706                refcount_read(&exp->exp_handle.h_ref),
1707                atomic_read(&exp->exp_rpc_count),
1708                atomic_read(&exp->exp_cb_count),
1709                atomic_read(&exp->exp_locks_count),
1710                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1711                nreplies, first_reply, nreplies > 3 ? "..." : "",
1712                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1713 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1714         if (locks && class_export_dump_hook != NULL)
1715                 class_export_dump_hook(exp);
1716 #endif
1717 }
1718
1719 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1720 {
1721         struct obd_export *exp;
1722
1723         spin_lock(&obd->obd_dev_lock);
1724         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1725                 print_export_data(exp, "ACTIVE", locks, debug_level);
1726         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1727                 print_export_data(exp, "UNLINKED", locks, debug_level);
1728         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1729                 print_export_data(exp, "DELAYED", locks, debug_level);
1730         spin_unlock(&obd->obd_dev_lock);
1731 }
1732
1733 void obd_exports_barrier(struct obd_device *obd)
1734 {
1735         int waited = 2;
1736
1737         LASSERT(list_empty(&obd->obd_exports));
1738         spin_lock(&obd->obd_dev_lock);
1739         while (!list_empty(&obd->obd_unlinked_exports)) {
1740                 spin_unlock(&obd->obd_dev_lock);
1741                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1742                 if (waited > 5 && is_power_of_2(waited)) {
1743                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1744                                       obd->obd_name, waited,
1745                                       kref_read(&obd->obd_refcount));
1746                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1747                 }
1748                 waited *= 2;
1749                 spin_lock(&obd->obd_dev_lock);
1750         }
1751         spin_unlock(&obd->obd_dev_lock);
1752 }
1753 EXPORT_SYMBOL(obd_exports_barrier);
1754
1755 /* Add export to the obd_zombe thread and notify it. */
1756 static void obd_zombie_export_add(struct obd_export *exp)
1757 {
1758         atomic_inc(&obd_stale_export_num);
1759         spin_lock(&exp->exp_obd->obd_dev_lock);
1760         LASSERT(!list_empty(&exp->exp_obd_chain));
1761         list_del_init(&exp->exp_obd_chain);
1762         spin_unlock(&exp->exp_obd->obd_dev_lock);
1763         queue_work(zombie_wq, &exp->exp_zombie_work);
1764 }
1765
1766 /* Add import to the obd_zombe thread and notify it. */
1767 static void obd_zombie_import_add(struct obd_import *imp)
1768 {
1769         LASSERT(imp->imp_sec == NULL);
1770
1771         queue_work(zombie_wq, &imp->imp_zombie_work);
1772 }
1773
1774 /* wait when obd_zombie import/export queues become empty */
1775 void obd_zombie_barrier(void)
1776 {
1777         wait_var_event(&obd_stale_export_num,
1778                         atomic_read(&obd_stale_export_num) == 0);
1779         flush_workqueue(zombie_wq);
1780 }
1781 EXPORT_SYMBOL(obd_zombie_barrier);
1782
1783
1784 struct obd_export *obd_stale_export_get(void)
1785 {
1786         struct obd_export *exp = NULL;
1787
1788         ENTRY;
1789
1790         spin_lock(&obd_stale_export_lock);
1791         if (!list_empty(&obd_stale_exports)) {
1792                 exp = list_first_entry(&obd_stale_exports,
1793                                        struct obd_export, exp_stale_list);
1794                 list_del_init(&exp->exp_stale_list);
1795         }
1796         spin_unlock(&obd_stale_export_lock);
1797
1798         if (exp) {
1799                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1800                        atomic_read(&obd_stale_export_num));
1801         }
1802         RETURN(exp);
1803 }
1804 EXPORT_SYMBOL(obd_stale_export_get);
1805
1806 void obd_stale_export_put(struct obd_export *exp)
1807 {
1808         ENTRY;
1809
1810         LASSERT(list_empty(&exp->exp_stale_list));
1811         if (exp->exp_lock_hash &&
1812             atomic_read(&exp->exp_lock_hash->hs_count)) {
1813                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1814                        atomic_read(&obd_stale_export_num));
1815
1816                 spin_lock_bh(&exp->exp_bl_list_lock);
1817                 spin_lock(&obd_stale_export_lock);
1818                 /* Add to the tail if there is no blocked locks,
1819                  * to the head otherwise.
1820                  */
1821                 if (list_empty(&exp->exp_bl_list))
1822                         list_add_tail(&exp->exp_stale_list,
1823                                       &obd_stale_exports);
1824                 else
1825                         list_add(&exp->exp_stale_list,
1826                                  &obd_stale_exports);
1827
1828                 spin_unlock(&obd_stale_export_lock);
1829                 spin_unlock_bh(&exp->exp_bl_list_lock);
1830         } else {
1831                 class_export_put(exp);
1832         }
1833         EXIT;
1834 }
1835 EXPORT_SYMBOL(obd_stale_export_put);
1836
1837 /**
1838  * Adjust the position of the export in the stale list,
1839  * i.e. move to the head of the list if is needed.
1840  **/
1841 void obd_stale_export_adjust(struct obd_export *exp)
1842 {
1843         LASSERT(exp != NULL);
1844         spin_lock_bh(&exp->exp_bl_list_lock);
1845         spin_lock(&obd_stale_export_lock);
1846
1847         if (!list_empty(&exp->exp_stale_list) &&
1848             !list_empty(&exp->exp_bl_list))
1849                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1850
1851         spin_unlock(&obd_stale_export_lock);
1852         spin_unlock_bh(&exp->exp_bl_list_lock);
1853 }
1854 EXPORT_SYMBOL(obd_stale_export_adjust);
1855
1856 /* start destroy zombie import/export thread */
1857 int obd_zombie_impexp_init(void)
1858 {
1859         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1860                                            0, CFS_CPT_ANY,
1861                                            cfs_cpt_number(cfs_cpt_tab));
1862
1863         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1864 }
1865
1866 /* stop destroy zombie import/export thread */
1867 void obd_zombie_impexp_stop(void)
1868 {
1869         destroy_workqueue(zombie_wq);
1870         LASSERT(list_empty(&obd_stale_exports));
1871 }
1872
1873 /***** Kernel-userspace comm helpers *******/
1874
1875 /* Get length of entire message, including header */
1876 int kuc_len(int payload_len)
1877 {
1878         return sizeof(struct kuc_hdr) + payload_len;
1879 }
1880 EXPORT_SYMBOL(kuc_len);
1881
1882 /* Get a pointer to kuc header, given a ptr to the payload
1883  * @param p Pointer to payload area
1884  * @returns Pointer to kuc header
1885  */
1886 struct kuc_hdr *kuc_ptr(void *p)
1887 {
1888         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1889
1890         LASSERT(lh->kuc_magic == KUC_MAGIC);
1891         return lh;
1892 }
1893 EXPORT_SYMBOL(kuc_ptr);
1894
1895 /* Alloc space for a message, and fill in header
1896  * @return Pointer to payload area
1897  */
1898 void *kuc_alloc(int payload_len, int transport, int type)
1899 {
1900         struct kuc_hdr *lh;
1901         int len = kuc_len(payload_len);
1902
1903         OBD_ALLOC(lh, len);
1904         if (lh == NULL)
1905                 return ERR_PTR(-ENOMEM);
1906
1907         lh->kuc_magic = KUC_MAGIC;
1908         lh->kuc_transport = transport;
1909         lh->kuc_msgtype = type;
1910         lh->kuc_msglen = len;
1911
1912         return (void *)(lh + 1);
1913 }
1914 EXPORT_SYMBOL(kuc_alloc);
1915
1916 /* Takes pointer to payload area */
1917 void kuc_free(void *p, int payload_len)
1918 {
1919         struct kuc_hdr *lh = kuc_ptr(p);
1920
1921         OBD_FREE(lh, kuc_len(payload_len));
1922 }
1923 EXPORT_SYMBOL(kuc_free);
1924
1925 struct obd_request_slot_waiter {
1926         struct list_head        orsw_entry;
1927         wait_queue_head_t       orsw_waitq;
1928         bool                    orsw_signaled;
1929 };
1930
1931 static bool obd_request_slot_avail(struct client_obd *cli,
1932                                    struct obd_request_slot_waiter *orsw)
1933 {
1934         bool avail;
1935
1936         spin_lock(&cli->cl_loi_list_lock);
1937         avail = !!list_empty(&orsw->orsw_entry);
1938         spin_unlock(&cli->cl_loi_list_lock);
1939
1940         return avail;
1941 };
1942
1943 /*
1944  * For network flow control, the RPC sponsor needs to acquire a credit
1945  * before sending the RPC. The credits count for a connection is defined
1946  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1947  * the subsequent RPC sponsors need to wait until others released their
1948  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1949  */
1950 int obd_get_request_slot(struct client_obd *cli)
1951 {
1952         struct obd_request_slot_waiter   orsw;
1953         int                              rc;
1954
1955         spin_lock(&cli->cl_loi_list_lock);
1956         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1957                 cli->cl_rpcs_in_flight++;
1958                 spin_unlock(&cli->cl_loi_list_lock);
1959                 return 0;
1960         }
1961
1962         init_waitqueue_head(&orsw.orsw_waitq);
1963         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1964         orsw.orsw_signaled = false;
1965         spin_unlock(&cli->cl_loi_list_lock);
1966
1967         rc = l_wait_event_abortable(orsw.orsw_waitq,
1968                                     obd_request_slot_avail(cli, &orsw) ||
1969                                     orsw.orsw_signaled);
1970
1971         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1972          * freed but other (such as obd_put_request_slot) is using it.
1973          */
1974         spin_lock(&cli->cl_loi_list_lock);
1975         if (rc != 0) {
1976                 if (!orsw.orsw_signaled) {
1977                         if (list_empty(&orsw.orsw_entry))
1978                                 cli->cl_rpcs_in_flight--;
1979                         else
1980                                 list_del(&orsw.orsw_entry);
1981                 }
1982                 rc = -EINTR;
1983         }
1984
1985         if (orsw.orsw_signaled) {
1986                 LASSERT(list_empty(&orsw.orsw_entry));
1987
1988                 rc = -EINTR;
1989         }
1990         spin_unlock(&cli->cl_loi_list_lock);
1991
1992         return rc;
1993 }
1994 EXPORT_SYMBOL(obd_get_request_slot);
1995
1996 void obd_put_request_slot(struct client_obd *cli)
1997 {
1998         struct obd_request_slot_waiter *orsw;
1999
2000         spin_lock(&cli->cl_loi_list_lock);
2001         cli->cl_rpcs_in_flight--;
2002
2003         /* If there is free slot, wakeup the first waiter. */
2004         if (!list_empty(&cli->cl_flight_waiters) &&
2005             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2006                 orsw = list_first_entry(&cli->cl_flight_waiters,
2007                                         struct obd_request_slot_waiter,
2008                                         orsw_entry);
2009                 list_del_init(&orsw->orsw_entry);
2010                 cli->cl_rpcs_in_flight++;
2011                 wake_up(&orsw->orsw_waitq);
2012         }
2013         spin_unlock(&cli->cl_loi_list_lock);
2014 }
2015 EXPORT_SYMBOL(obd_put_request_slot);
2016
2017 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2018 {
2019         return cli->cl_max_rpcs_in_flight;
2020 }
2021 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2022
2023 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2024 {
2025         struct obd_request_slot_waiter *orsw;
2026         __u32                           old;
2027         int                             diff;
2028         int                             i;
2029         int                             rc;
2030
2031         if (max > OBD_MAX_RIF_MAX || max < 1)
2032                 return -ERANGE;
2033
2034         CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
2035                cli->cl_import->imp_obd->obd_name, max,
2036                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2037
2038         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2039                    LUSTRE_MDC_NAME) == 0) {
2040                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2041                  * strictly lower that max_rpcs_in_flight
2042                  */
2043                 if (max < 2) {
2044                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2045                                cli->cl_import->imp_obd->obd_name);
2046                         return -ERANGE;
2047                 }
2048                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2049                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2050                         if (rc != 0)
2051                                 return rc;
2052                 }
2053         }
2054
2055         spin_lock(&cli->cl_loi_list_lock);
2056         old = cli->cl_max_rpcs_in_flight;
2057         cli->cl_max_rpcs_in_flight = max;
2058         client_adjust_max_dirty(cli);
2059
2060         diff = max - old;
2061
2062         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2063         for (i = 0; i < diff; i++) {
2064                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2065                                                 struct obd_request_slot_waiter,
2066                                                 orsw_entry);
2067                 if (!orsw)
2068                         break;
2069
2070                 list_del_init(&orsw->orsw_entry);
2071                 cli->cl_rpcs_in_flight++;
2072                 wake_up(&orsw->orsw_waitq);
2073         }
2074         spin_unlock(&cli->cl_loi_list_lock);
2075
2076         return 0;
2077 }
2078 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2079
2080 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2081 {
2082         return cli->cl_max_mod_rpcs_in_flight;
2083 }
2084 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2085
2086 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2087 {
2088         struct obd_connect_data *ocd;
2089         __u16 maxmodrpcs;
2090         __u16 prev;
2091
2092         if (max > OBD_MAX_RIF_MAX || max < 1)
2093                 return -ERANGE;
2094
2095         ocd = &cli->cl_import->imp_connect_data;
2096         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2097                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2098                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2099
2100         if (max == OBD_MAX_RIF_MAX)
2101                 max = OBD_MAX_RIF_MAX - 1;
2102
2103         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2104          * increase this value, also bump up max_rpcs_in_flight to match.
2105          */
2106         if (max >= cli->cl_max_rpcs_in_flight) {
2107                 CDEBUG(D_INFO,
2108                        "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2109                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2110                 obd_set_max_rpcs_in_flight(cli, max + 1);
2111         }
2112
2113         /* cannot exceed max modify RPCs in flight supported by the server,
2114          * but verify ocd_connect_flags is at least initialized first.  If
2115          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2116          */
2117         if (!ocd->ocd_connect_flags) {
2118                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2119         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2120                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2121                 if (maxmodrpcs == 0) { /* connection not finished yet */
2122                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2123                         CDEBUG(D_INFO,
2124                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2125                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2126                 }
2127         } else {
2128                 maxmodrpcs = 1;
2129         }
2130         if (max > maxmodrpcs) {
2131                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2132                        cli->cl_import->imp_obd->obd_name,
2133                        max, maxmodrpcs);
2134                 return -ERANGE;
2135         }
2136
2137         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2138
2139         prev = cli->cl_max_mod_rpcs_in_flight;
2140         cli->cl_max_mod_rpcs_in_flight = max;
2141
2142         /* wakeup waiters if limit has been increased */
2143         if (cli->cl_max_mod_rpcs_in_flight > prev)
2144                 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2145
2146         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2147
2148         return 0;
2149 }
2150 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2151
2152 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2153                                struct seq_file *seq)
2154 {
2155         unsigned long mod_tot = 0, mod_cum;
2156         int i;
2157
2158         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2159         lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2160                              ":", true, "");
2161         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2162                    cli->cl_mod_rpcs_in_flight);
2163
2164         seq_puts(seq, "\n\t\t\tmodify\n");
2165         seq_puts(seq, "rpcs in flight        rpcs   %% cum %%\n");
2166
2167         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2168
2169         mod_cum = 0;
2170         for (i = 0; i < OBD_HIST_MAX; i++) {
2171                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2172
2173                 mod_cum += mod;
2174                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2175                            i, mod, pct(mod, mod_tot),
2176                            pct(mod_cum, mod_tot));
2177                 if (mod_cum == mod_tot)
2178                         break;
2179         }
2180
2181         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2182
2183         return 0;
2184 }
2185 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2186
2187 /* The number of modify RPCs sent in parallel is limited
2188  * because the server has a finite number of slots per client to
2189  * store request result and ensure reply reconstruction when needed.
2190  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2191  * that takes into account server limit and cl_max_rpcs_in_flight
2192  * value.
2193  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2194  * one close request is allowed above the maximum.
2195  */
2196 struct mod_waiter {
2197         struct client_obd *cli;
2198         bool close_req;
2199         bool woken;
2200         wait_queue_entry_t wqe;
2201 };
2202 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2203                                   unsigned int mode, int flags, void *key)
2204 {
2205         struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2206         struct client_obd *cli = w->cli;
2207         bool close_req = w->close_req;
2208         bool avail;
2209         int ret;
2210
2211         /* As woken_wake_function() doesn't remove us from the wait_queue,
2212          * we use own flag to ensure we're called just once.
2213          */
2214         if (w->woken)
2215                 return 0;
2216
2217         /* A slot is available if
2218          * - number of modify RPCs in flight is less than the max
2219          * - it's a close RPC and no other close request is in flight
2220          */
2221         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2222                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2223         if (avail) {
2224                 cli->cl_mod_rpcs_in_flight++;
2225                 if (close_req)
2226                         cli->cl_close_rpcs_in_flight++;
2227                 ret = woken_wake_function(wq_entry, mode, flags, key);
2228                 w->woken = true;
2229         } else if (cli->cl_close_rpcs_in_flight)
2230                 /* No other waiter could be woken */
2231                 ret = -1;
2232         else if (!key)
2233                 /* This was not a wakeup from a close completion or a new close
2234                  * being queued, so there is no point seeing if there are close
2235                  * waiters to be woken.
2236                  */
2237                 ret = -1;
2238         else
2239                 /* There might be be a close we could wake, keep looking */
2240                 ret = 0;
2241         return ret;
2242 }
2243
2244 /* Get a modify RPC slot from the obd client @cli according
2245  * to the kind of operation @opc that is going to be sent
2246  * and the intent @it of the operation if it applies.
2247  * If the maximum number of modify RPCs in flight is reached
2248  * the thread is put to sleep.
2249  * Returns the tag to be set in the request message. Tag 0
2250  * is reserved for non-modifying requests.
2251  */
2252 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2253 {
2254         struct mod_waiter wait = {
2255                 .cli = cli,
2256                 .close_req = (opc == MDS_CLOSE),
2257                 .woken = false,
2258         };
2259         __u16                   i, max;
2260
2261         init_wait(&wait.wqe);
2262         wait.wqe.func = claim_mod_rpc_function;
2263
2264         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2265         __add_wait_queue_entry_tail(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2266         /* This wakeup will only succeed if the maximums haven't
2267          * been reached.  If that happens, wait.woken will be set
2268          * and there will be no need to wait.
2269          * If a close_req was enqueue, ensure we search all the way to the
2270          * end of the waitqueue for a close request.
2271          */
2272         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2273                              (void*)wait.close_req);
2274
2275         while (wait.woken == false) {
2276                 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2277                 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2278                            MAX_SCHEDULE_TIMEOUT);
2279                 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2280         }
2281         __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2282
2283         max = cli->cl_max_mod_rpcs_in_flight;
2284         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2285                          cli->cl_mod_rpcs_in_flight);
2286         /* find a free tag */
2287         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2288                                 max + 1);
2289         LASSERT(i < OBD_MAX_RIF_MAX);
2290         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2291         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2292         /* tag 0 is reserved for non-modify RPCs */
2293
2294         CDEBUG(D_RPCTRACE,
2295                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2296                cli->cl_import->imp_obd->obd_name,
2297                i + 1, opc, max);
2298
2299         return i + 1;
2300 }
2301 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2302
2303 /* Put a modify RPC slot from the obd client @cli according
2304  * to the kind of operation @opc that has been sent.
2305  */
2306 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2307 {
2308         bool                    close_req = false;
2309
2310         if (tag == 0)
2311                 return;
2312
2313         if (opc == MDS_CLOSE)
2314                 close_req = true;
2315
2316         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2317         cli->cl_mod_rpcs_in_flight--;
2318         if (close_req)
2319                 cli->cl_close_rpcs_in_flight--;
2320         /* release the tag in the bitmap */
2321         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2322         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2323         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2324                              (void *)close_req);
2325         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2326 }
2327 EXPORT_SYMBOL(obd_put_mod_rpc_slot);