Whamcloud - gitweb
ff1d7b4fc8ea5d324778c9b792d0cabd12b1a6f9
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_XARRAY_ALLOC(obd_devs);
49 EXPORT_SYMBOL(obd_devs);
50
51 static atomic_t obd_devs_count = ATOMIC_INIT(0);
52
53 static struct kmem_cache *obd_device_cachep;
54 static struct kobj_type class_ktype;
55 static struct workqueue_struct *zombie_wq;
56
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks, int debug_level);
61
62 static LIST_HEAD(obd_stale_exports);
63 static DEFINE_SPINLOCK(obd_stale_export_lock);
64 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65
66 static struct obd_device *obd_device_alloc(void)
67 {
68         struct obd_device *obd;
69
70         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
71         if (obd != NULL) {
72                 obd->obd_magic = OBD_DEVICE_MAGIC;
73         }
74         return obd;
75 }
76
77 static void obd_device_free(struct obd_device *obd)
78 {
79         LASSERT(obd != NULL);
80         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
81                  "obd %p obd_magic %08x != %08x\n",
82                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
83         if (obd->obd_namespace != NULL) {
84                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
85                        obd, obd->obd_namespace, obd->obd_force);
86                 LBUG();
87         }
88         lu_ref_fini(&obd->obd_reference);
89         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
90 }
91
92 struct obd_type *class_search_type(const char *name)
93 {
94         struct kobject *kobj = kset_find_obj(lustre_kset, name);
95
96         if (kobj && kobj->ktype == &class_ktype)
97                 return container_of(kobj, struct obd_type, typ_kobj);
98
99         kobject_put(kobj);
100         return NULL;
101 }
102 EXPORT_SYMBOL(class_search_type);
103
104 struct obd_type *class_get_type(const char *name)
105 {
106         struct obd_type *type;
107
108         type = class_search_type(name);
109 #ifdef HAVE_MODULE_LOADING_SUPPORT
110         if (!type) {
111                 const char *modname = name;
112
113 #ifdef HAVE_SERVER_SUPPORT
114                 if (strcmp(modname, "obdfilter") == 0)
115                         modname = "ofd";
116
117                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
118                         modname = LUSTRE_OSP_NAME;
119
120                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
121                         modname = LUSTRE_MDT_NAME;
122 #endif /* HAVE_SERVER_SUPPORT */
123
124                 if (!request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 if (try_module_get(type->typ_dt_ops->o_owner)) {
135                         atomic_inc(&type->typ_refcnt);
136                         /* class_search_type() returned a counted reference,
137                          * but we don't need that count any more as
138                          * we have one through typ_refcnt.
139                          */
140                         kobject_put(&type->typ_kobj);
141                 } else {
142                         kobject_put(&type->typ_kobj);
143                         type = NULL;
144                 }
145         }
146         return type;
147 }
148 EXPORT_SYMBOL(class_get_type);
149
150 void class_put_type(struct obd_type *type)
151 {
152         LASSERT(type);
153         module_put(type->typ_dt_ops->o_owner);
154         atomic_dec(&type->typ_refcnt);
155 }
156 EXPORT_SYMBOL(class_put_type);
157
158 static void class_sysfs_release(struct kobject *kobj)
159 {
160         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
161
162         debugfs_remove_recursive(type->typ_debugfs_entry);
163         type->typ_debugfs_entry = NULL;
164
165         if (type->typ_lu)
166                 lu_device_type_fini(type->typ_lu);
167
168 #ifdef CONFIG_PROC_FS
169         if (type->typ_name && type->typ_procroot)
170                 remove_proc_subtree(type->typ_name, proc_lustre_root);
171 #endif
172         OBD_FREE(type, sizeof(*type));
173 }
174
175 static struct kobj_type class_ktype = {
176         .sysfs_ops      = &lustre_sysfs_ops,
177         .release        = class_sysfs_release,
178 };
179
180 #ifdef HAVE_SERVER_SUPPORT
181 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
182 {
183         struct dentry *symlink;
184         struct obd_type *type;
185         int rc;
186
187         type = class_search_type(name);
188         if (type) {
189                 kobject_put(&type->typ_kobj);
190                 return ERR_PTR(-EEXIST);
191         }
192
193         OBD_ALLOC(type, sizeof(*type));
194         if (!type)
195                 return ERR_PTR(-ENOMEM);
196
197         type->typ_kobj.kset = lustre_kset;
198         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
199                                   &lustre_kset->kobj, "%s", name);
200         if (rc)
201                 return ERR_PTR(rc);
202
203         symlink = debugfs_create_dir(name, debugfs_lustre_root);
204         type->typ_debugfs_entry = symlink;
205         type->typ_sym_filter = true;
206
207         if (enable_proc) {
208                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
209                                                       NULL, NULL);
210                 if (IS_ERR(type->typ_procroot)) {
211                         CERROR("%s: can't create compat proc entry: %d\n",
212                                name, (int)PTR_ERR(type->typ_procroot));
213                         type->typ_procroot = NULL;
214                 }
215         }
216
217         return type;
218 }
219 EXPORT_SYMBOL(class_add_symlinks);
220 #endif /* HAVE_SERVER_SUPPORT */
221
222 #define CLASS_MAX_NAME 1024
223
224 int class_register_type(const struct obd_ops *dt_ops,
225                         const struct md_ops *md_ops,
226                         bool enable_proc,
227                         const char *name, struct lu_device_type *ldt)
228 {
229         struct obd_type *type;
230         int rc;
231
232         ENTRY;
233         /* sanity check */
234         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
235
236         type = class_search_type(name);
237         if (type) {
238 #ifdef HAVE_SERVER_SUPPORT
239                 if (type->typ_sym_filter)
240                         goto dir_exist;
241 #endif /* HAVE_SERVER_SUPPORT */
242                 kobject_put(&type->typ_kobj);
243                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
244                 RETURN(-EEXIST);
245         }
246
247         OBD_ALLOC(type, sizeof(*type));
248         if (type == NULL)
249                 RETURN(-ENOMEM);
250
251         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
252         type->typ_kobj.kset = lustre_kset;
253         kobject_init(&type->typ_kobj, &class_ktype);
254 #ifdef HAVE_SERVER_SUPPORT
255 dir_exist:
256 #endif /* HAVE_SERVER_SUPPORT */
257
258         type->typ_dt_ops = dt_ops;
259         type->typ_md_ops = md_ops;
260
261 #ifdef HAVE_SERVER_SUPPORT
262         if (type->typ_sym_filter) {
263                 type->typ_sym_filter = false;
264                 kobject_put(&type->typ_kobj);
265                 goto setup_ldt;
266         }
267 #endif
268 #ifdef CONFIG_PROC_FS
269         if (enable_proc && !type->typ_procroot) {
270                 type->typ_procroot = lprocfs_register(name,
271                                                       proc_lustre_root,
272                                                       NULL, type);
273                 if (IS_ERR(type->typ_procroot)) {
274                         rc = PTR_ERR(type->typ_procroot);
275                         type->typ_procroot = NULL;
276                         GOTO(failed, rc);
277                 }
278         }
279 #endif
280         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
281
282         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
283         if (rc)
284                 GOTO(failed, rc);
285 #ifdef HAVE_SERVER_SUPPORT
286 setup_ldt:
287 #endif
288         if (ldt) {
289                 rc = lu_device_type_init(ldt);
290                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
291                 wake_up_var(&type->typ_lu);
292                 if (rc)
293                         GOTO(failed, rc);
294         }
295
296         RETURN(0);
297
298 failed:
299         kobject_put(&type->typ_kobj);
300
301         RETURN(rc);
302 }
303 EXPORT_SYMBOL(class_register_type);
304
305 int class_unregister_type(const char *name)
306 {
307         struct obd_type *type = class_search_type(name);
308         int rc = 0;
309         ENTRY;
310
311         if (!type) {
312                 CERROR("unknown obd type\n");
313                 RETURN(-EINVAL);
314         }
315
316         if (atomic_read(&type->typ_refcnt)) {
317                 CERROR("type %s has refcount (%d)\n", name,
318                        atomic_read(&type->typ_refcnt));
319                 /* This is a bad situation, let's make the best of it */
320                 /* Remove ops, but leave the name for debugging */
321                 type->typ_dt_ops = NULL;
322                 type->typ_md_ops = NULL;
323                 GOTO(out_put, rc = -EBUSY);
324         }
325
326         /* Put the final ref */
327         kobject_put(&type->typ_kobj);
328 out_put:
329         /* Put the ref returned by class_search_type() */
330         kobject_put(&type->typ_kobj);
331
332         RETURN(rc);
333 } /* class_unregister_type */
334 EXPORT_SYMBOL(class_unregister_type);
335
336 /**
337  * Create a new obd device.
338  *
339  * Allocate the new obd_device and initialize it.
340  *
341  * \param[in] type_name obd device type string.
342  * \param[in] name      obd device name.
343  * \param[in] uuid      obd device UUID
344  *
345  * \retval newdev         pointer to created obd_device
346  * \retval ERR_PTR(errno) on error
347  */
348 struct obd_device *class_newdev(const char *type_name, const char *name,
349                                 const char *uuid)
350 {
351         struct obd_device *newdev;
352         struct obd_type *type = NULL;
353         ENTRY;
354
355         if (strlen(name) >= MAX_OBD_NAME) {
356                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
357                 RETURN(ERR_PTR(-EINVAL));
358         }
359
360         type = class_get_type(type_name);
361         if (type == NULL){
362                 CERROR("OBD: unknown type: %s\n", type_name);
363                 RETURN(ERR_PTR(-ENODEV));
364         }
365
366         newdev = obd_device_alloc();
367         if (newdev == NULL) {
368                 class_put_type(type);
369                 RETURN(ERR_PTR(-ENOMEM));
370         }
371         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
372         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
373         newdev->obd_type = type;
374         newdev->obd_minor = -1;
375
376         rwlock_init(&newdev->obd_pool_lock);
377         newdev->obd_pool_limit = 0;
378         newdev->obd_pool_slv = 0;
379
380         INIT_LIST_HEAD(&newdev->obd_exports);
381         newdev->obd_num_exports = 0;
382         newdev->obd_grant_check_threshold = 100;
383         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
384         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
385         INIT_LIST_HEAD(&newdev->obd_exports_timed);
386         INIT_LIST_HEAD(&newdev->obd_nid_stats);
387         spin_lock_init(&newdev->obd_nid_lock);
388         spin_lock_init(&newdev->obd_dev_lock);
389         mutex_init(&newdev->obd_dev_mutex);
390         spin_lock_init(&newdev->obd_osfs_lock);
391         /* newdev->obd_osfs_age must be set to a value in the distant
392          * past to guarantee a fresh statfs is fetched on mount. */
393         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
394
395         /* XXX belongs in setup not attach  */
396         init_rwsem(&newdev->obd_observer_link_sem);
397         /* recovery data */
398         spin_lock_init(&newdev->obd_recovery_task_lock);
399         init_waitqueue_head(&newdev->obd_next_transno_waitq);
400         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
401         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
402         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
403         INIT_LIST_HEAD(&newdev->obd_evict_list);
404         INIT_LIST_HEAD(&newdev->obd_lwp_list);
405
406         llog_group_init(&newdev->obd_olg);
407         /* Detach drops this */
408         kref_init(&newdev->obd_refcount);
409         lu_ref_init(&newdev->obd_reference);
410         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
411
412         newdev->obd_conn_inprogress = 0;
413
414         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
415
416         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
417                newdev->obd_name, newdev);
418
419         return newdev;
420 }
421
422 /**
423  * Free obd device.
424  *
425  * \param[in] obd obd_device to be freed
426  *
427  * \retval none
428  */
429 void class_free_dev(struct obd_device *obd)
430 {
431         struct obd_type *obd_type = obd->obd_type;
432
433         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
434                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
435         LASSERTF(obd->obd_minor == -1 || class_num2obd(obd->obd_minor) == obd,
436                  "obd %p != obd_devs[%d] %p\n",
437                  obd, obd->obd_minor, class_num2obd(obd->obd_minor));
438         LASSERTF(kref_read(&obd->obd_refcount) == 0,
439                  "obd_refcount should be 0, not %d\n",
440                  kref_read(&obd->obd_refcount));
441         LASSERT(obd_type != NULL);
442
443         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
444                obd->obd_name, obd->obd_type->typ_name);
445
446         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
447                          obd->obd_name, obd->obd_uuid.uuid);
448         if (obd->obd_stopping) {
449                 int err;
450
451                 /* If we're not stopping, we were never set up */
452                 err = obd_cleanup(obd);
453                 if (err)
454                         CERROR("Cleanup %s returned %d\n",
455                                 obd->obd_name, err);
456         }
457
458         obd_device_free(obd);
459
460         class_put_type(obd_type);
461 }
462
463 /**
464  * Unregister obd device.
465  *
466  * Remove an obd from obd_dev
467  *
468  * \param[in] new_obd obd_device to be unregistered
469  *
470  * \retval none
471  */
472 void class_unregister_device(struct obd_device *obd)
473 {
474         if (obd->obd_minor >= 0) {
475                 xa_erase(&obd_devs, obd->obd_minor);
476                 class_decref(obd, "obd_device_list", obd);
477                 obd->obd_minor = -1;
478                 atomic_dec(&obd_devs_count);
479         }
480 }
481
482 /**
483  * Register obd device.
484  *
485  * Add new_obd to obd_devs
486  *
487  * \param[in] new_obd obd_device to be registered
488  *
489  * \retval 0          success
490  * \retval -EEXIST    device with this name is registered
491  */
492 int class_register_device(struct obd_device *new_obd)
493 {
494         int rc = 0;
495         int dev_no = 0;
496
497         if (new_obd == NULL) {
498                 rc = -1;
499                 goto out;
500         }
501
502         /*
503          * The obd_device could be waiting to be
504          * destroyed by "obd_zombie_impexp_thread"
505          */
506         if (class_name2dev(new_obd->obd_name) != -1)
507                 obd_zombie_barrier();
508
509         if (class_name2dev(new_obd->obd_name) == -1) {
510                 class_incref(new_obd, "obd_device_list", new_obd);
511                 rc = xa_alloc(&obd_devs, &dev_no, new_obd,
512                               xa_limit_31b, GFP_ATOMIC);
513
514                 if (rc != 0)
515                         goto out;
516
517                 new_obd->obd_minor = dev_no;
518                 atomic_inc(&obd_devs_count);
519         } else {
520                 rc = -EEXIST;
521         }
522
523 out:
524         RETURN(rc);
525 }
526
527 int class_name2dev(const char *name)
528 {
529         struct obd_device *obd = NULL;
530         unsigned long dev_no = 0;
531         int ret;
532
533         if (!name)
534                 return -1;
535
536         obd_device_lock();
537         obd_device_for_each(dev_no, obd) {
538                 if (strcmp(name, obd->obd_name) == 0) {
539                         /*
540                          * Make sure we finished attaching before we give
541                          * out any references
542                          */
543                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
544                         if (obd->obd_attached) {
545                                 ret = obd->obd_minor;
546                                 obd_device_unlock();
547                                 return ret;
548                         }
549                         break;
550                 }
551         }
552         obd_device_unlock();
553
554         return -1;
555 }
556 EXPORT_SYMBOL(class_name2dev);
557
558 struct obd_device *class_name2obd(const char *name)
559 {
560         struct obd_device *obd = NULL;
561         unsigned long dev_no = 0;
562
563         if (!name)
564                 return NULL;
565
566         obd_device_lock();
567         obd_device_for_each(dev_no, obd) {
568                 if (strcmp(name, obd->obd_name) == 0) {
569                         /*
570                          * Make sure we finished attaching before we give
571                          * out any references
572                          */
573                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
574                         if (obd->obd_attached)
575                                 break;
576                 }
577         }
578         obd_device_unlock();
579
580         /*
581          * TODO: We give out a reference without class_incref(). This isn't
582          * ideal, but this behavior is identical in previous implementations
583          * of this function.
584          */
585         return obd;
586 }
587 EXPORT_SYMBOL(class_name2obd);
588
589 int class_uuid2dev(struct obd_uuid *uuid)
590 {
591         struct obd_device *obd = NULL;
592         unsigned long dev_no = 0;
593         int ret;
594
595         obd_device_lock();
596         obd_device_for_each(dev_no, obd) {
597                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
598                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
599                         ret = obd->obd_minor;
600                         obd_device_unlock();
601                         return ret;
602                 }
603         }
604         obd_device_unlock();
605
606         return -1;
607 }
608 EXPORT_SYMBOL(class_uuid2dev);
609
610 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
611 {
612         struct obd_device *obd = NULL;
613         unsigned long dev_no = 0;
614
615         obd_device_lock();
616         obd_device_for_each(dev_no, obd) {
617                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
618                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
619                         break;
620                 }
621         }
622         obd_device_unlock();
623
624         /*
625          * TODO: We give out a reference without class_incref(). This isn't
626          * ideal, but this behavior is identical in previous implementations
627          * of this function.
628          */
629         return obd;
630 }
631 EXPORT_SYMBOL(class_uuid2obd);
632
633 struct obd_device *class_num2obd(int dev_no)
634 {
635         return xa_load(&obd_devs, dev_no);
636 }
637 EXPORT_SYMBOL(class_num2obd);
638
639 /**
640  * Find obd by name or uuid.
641  *
642  * Increment obd's refcount if found.
643  *
644  * \param[in] str obd name or uuid
645  *
646  * \retval NULL    if not found
647  * \retval obd     pointer to found obd_device
648  */
649 struct obd_device *class_str2obd(const char *str)
650 {
651         struct obd_device *obd = NULL;
652         struct obd_uuid uuid;
653         unsigned long dev_no = 0;
654
655         obd_str2uuid(&uuid, str);
656
657         obd_device_lock();
658         obd_device_for_each(dev_no, obd) {
659                 if (obd_uuid_equals(&uuid, &obd->obd_uuid) ||
660                     (strcmp(str, obd->obd_name) == 0)) {
661                         /*
662                          * Make sure we finished attaching before we give
663                          * out any references
664                          */
665                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
666                         if (obd->obd_attached) {
667                                 class_incref(obd, "find", current);
668                                 break;
669                         }
670                         RETURN(NULL);
671                 }
672         }
673         obd_device_unlock();
674
675         RETURN(obd);
676 }
677 EXPORT_SYMBOL(class_str2obd);
678
679 /**
680  * Get obd devices count. Device in any
681  *    state are counted
682  * \retval obd device count
683  */
684 int class_obd_devs_count(void)
685 {
686         return atomic_read(&obd_devs_count);
687 }
688 EXPORT_SYMBOL(class_obd_devs_count);
689
690 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
691  * specified, then only the client with that uuid is returned,
692  * otherwise any client connected to the tgt is returned.
693  */
694 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
695                                          const char *type_name,
696                                          struct obd_uuid *grp_uuid)
697 {
698         struct obd_device *obd = NULL;
699         unsigned long dev_no = 0;
700
701         obd_device_lock();
702         obd_device_for_each(dev_no, obd) {
703                 if ((strncmp(obd->obd_type->typ_name, type_name,
704                              strlen(type_name)) == 0)) {
705                         if (obd_uuid_equals(tgt_uuid,
706                                             &obd->u.cli.cl_target_uuid) &&
707                             ((grp_uuid) ? obd_uuid_equals(grp_uuid,
708                                                          &obd->obd_uuid) : 1)) {
709                                 obd_device_unlock();
710                                 return obd;
711                         }
712                 }
713         }
714         obd_device_unlock();
715
716         return NULL;
717 }
718 EXPORT_SYMBOL(class_find_client_obd);
719
720 /**
721  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
722  * adjust sptlrpc settings accordingly.
723  */
724 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
725 {
726         struct obd_device *obd = NULL;
727         unsigned long dev_no = 0;
728         const char *type;
729         int rc = 0, rc2;
730
731         LASSERT(namelen > 0);
732
733         obd_device_lock();
734         obd_device_for_each(dev_no, obd) {
735                 if (obd->obd_set_up == 0 || obd->obd_stopping)
736                         continue;
737
738                 /* only notify mdc, osc, osp, lwp, mdt, ost
739                  * because only these have a -sptlrpc llog */
740                 type = obd->obd_type->typ_name;
741                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
742                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
743                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
744                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
745                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
746                     strcmp(type, LUSTRE_OST_NAME) != 0)
747                         continue;
748
749                 if (strncmp(obd->obd_name, fsname, namelen))
750                         continue;
751
752                 class_incref(obd, __func__, obd);
753                 obd_device_unlock();
754                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
755                                          sizeof(KEY_SPTLRPC_CONF),
756                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
757                 rc = rc ? rc : rc2;
758                 obd_device_lock();
759                 class_decref(obd, __func__, obd);
760         }
761         obd_device_unlock();
762
763         return rc;
764 }
765 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
766
767 void obd_cleanup_caches(void)
768 {
769         ENTRY;
770         if (obd_device_cachep) {
771                 kmem_cache_destroy(obd_device_cachep);
772                 obd_device_cachep = NULL;
773         }
774
775         EXIT;
776 }
777
778 int obd_init_caches(void)
779 {
780         int rc;
781         ENTRY;
782
783         LASSERT(obd_device_cachep == NULL);
784         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
785                                 sizeof(struct obd_device),
786                                 0, 0, 0, sizeof(struct obd_device), NULL);
787         if (!obd_device_cachep)
788                 GOTO(out, rc = -ENOMEM);
789
790         RETURN(0);
791 out:
792         obd_cleanup_caches();
793         RETURN(rc);
794 }
795
796 static const char export_handle_owner[] = "export";
797
798 /* map connection to client */
799 struct obd_export *class_conn2export(struct lustre_handle *conn)
800 {
801         struct obd_export *export;
802         ENTRY;
803
804         if (!conn) {
805                 CDEBUG(D_CACHE, "looking for null handle\n");
806                 RETURN(NULL);
807         }
808
809         if (conn->cookie == -1) {  /* this means assign a new connection */
810                 CDEBUG(D_CACHE, "want a new connection\n");
811                 RETURN(NULL);
812         }
813
814         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
815         export = class_handle2object(conn->cookie, export_handle_owner);
816         RETURN(export);
817 }
818 EXPORT_SYMBOL(class_conn2export);
819
820 struct obd_device *class_exp2obd(struct obd_export *exp)
821 {
822         if (exp)
823                 return exp->exp_obd;
824         return NULL;
825 }
826 EXPORT_SYMBOL(class_exp2obd);
827
828 struct obd_import *class_exp2cliimp(struct obd_export *exp)
829 {
830         struct obd_device *obd = exp->exp_obd;
831         if (obd == NULL)
832                 return NULL;
833         return obd->u.cli.cl_import;
834 }
835 EXPORT_SYMBOL(class_exp2cliimp);
836
837 /* Export management functions */
838 static void class_export_destroy(struct obd_export *exp)
839 {
840         struct obd_device *obd = exp->exp_obd;
841         ENTRY;
842
843         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
844         LASSERT(obd != NULL);
845
846         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
847                exp->exp_client_uuid.uuid, obd->obd_name);
848
849         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
850         ptlrpc_connection_put(exp->exp_connection);
851
852         LASSERT(list_empty(&exp->exp_outstanding_replies));
853         LASSERT(list_empty(&exp->exp_uncommitted_replies));
854         LASSERT(list_empty(&exp->exp_req_replay_queue));
855         LASSERT(list_empty(&exp->exp_hp_rpcs));
856         obd_destroy_export(exp);
857         /* self export doesn't hold a reference to an obd, although it
858          * exists until freeing of the obd */
859         if (exp != obd->obd_self_export)
860                 class_decref(obd, "export", exp);
861
862         OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
863         kfree_rcu(exp, exp_handle.h_rcu);
864         EXIT;
865 }
866
867 struct obd_export *class_export_get(struct obd_export *exp)
868 {
869         refcount_inc(&exp->exp_handle.h_ref);
870         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
871                refcount_read(&exp->exp_handle.h_ref));
872         return exp;
873 }
874 EXPORT_SYMBOL(class_export_get);
875
876 void class_export_put(struct obd_export *exp)
877 {
878         LASSERT(exp != NULL);
879         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
880         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
881         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
882                refcount_read(&exp->exp_handle.h_ref) - 1);
883
884         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
885                 struct obd_device *obd = exp->exp_obd;
886
887                 CDEBUG(D_IOCTL, "final put %p/%s\n",
888                        exp, exp->exp_client_uuid.uuid);
889
890                 /* release nid stat refererence */
891                 lprocfs_exp_cleanup(exp);
892
893                 if (exp == obd->obd_self_export) {
894                         /* self export should be destroyed without
895                          * zombie thread as it doesn't hold a
896                          * reference to obd and doesn't hold any
897                          * resources */
898                         class_export_destroy(exp);
899                         /* self export is destroyed, no class
900                          * references exist and it is safe to free
901                          * obd */
902                         class_free_dev(obd);
903                 } else {
904                         LASSERT(!list_empty(&exp->exp_obd_chain));
905                         obd_zombie_export_add(exp);
906                 }
907
908         }
909 }
910 EXPORT_SYMBOL(class_export_put);
911
912 static void obd_zombie_exp_cull(struct work_struct *ws)
913 {
914         struct obd_export *export;
915
916         export = container_of(ws, struct obd_export, exp_zombie_work);
917         class_export_destroy(export);
918         LASSERT(atomic_read(&obd_stale_export_num) > 0);
919         if (atomic_dec_and_test(&obd_stale_export_num))
920                 wake_up_var(&obd_stale_export_num);
921 }
922
923 /* Creates a new export, adds it to the hash table, and returns a
924  * pointer to it. The refcount is 2: one for the hash reference, and
925  * one for the pointer returned by this function. */
926 static struct obd_export *__class_new_export(struct obd_device *obd,
927                                              struct obd_uuid *cluuid,
928                                              bool is_self)
929 {
930         struct obd_export *export;
931         int rc = 0;
932         ENTRY;
933
934         OBD_ALLOC_PTR(export);
935         if (!export)
936                 return ERR_PTR(-ENOMEM);
937
938         export->exp_conn_cnt = 0;
939         export->exp_lock_hash = NULL;
940         export->exp_flock_hash = NULL;
941         /* 2 = class_handle_hash + last */
942         refcount_set(&export->exp_handle.h_ref, 2);
943         atomic_set(&export->exp_rpc_count, 0);
944         atomic_set(&export->exp_cb_count, 0);
945         atomic_set(&export->exp_locks_count, 0);
946 #if LUSTRE_TRACKS_LOCK_EXP_REFS
947         INIT_LIST_HEAD(&export->exp_locks_list);
948         spin_lock_init(&export->exp_locks_list_guard);
949 #endif
950         atomic_set(&export->exp_replay_count, 0);
951         export->exp_obd = obd;
952         INIT_LIST_HEAD(&export->exp_outstanding_replies);
953         spin_lock_init(&export->exp_uncommitted_replies_lock);
954         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
955         INIT_LIST_HEAD(&export->exp_req_replay_queue);
956         INIT_HLIST_NODE(&export->exp_handle.h_link);
957         INIT_LIST_HEAD(&export->exp_hp_rpcs);
958         INIT_LIST_HEAD(&export->exp_reg_rpcs);
959         class_handle_hash(&export->exp_handle, export_handle_owner);
960         export->exp_last_request_time = ktime_get_real_seconds();
961         spin_lock_init(&export->exp_lock);
962         spin_lock_init(&export->exp_rpc_lock);
963         INIT_HLIST_NODE(&export->exp_gen_hash);
964         spin_lock_init(&export->exp_bl_list_lock);
965         INIT_LIST_HEAD(&export->exp_bl_list);
966         INIT_LIST_HEAD(&export->exp_stale_list);
967         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
968
969         export->exp_sp_peer = LUSTRE_SP_ANY;
970         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
971         export->exp_client_uuid = *cluuid;
972         obd_init_export(export);
973
974         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
975         export->exp_root_fid.f_seq = 0;
976         export->exp_root_fid.f_oid = 0;
977         export->exp_root_fid.f_ver = 0;
978
979         spin_lock(&obd->obd_dev_lock);
980         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
981                 /* shouldn't happen, but might race */
982                 if (obd->obd_stopping)
983                         GOTO(exit_unlock, rc = -ENODEV);
984
985                 rc = obd_uuid_add(obd, export);
986                 if (rc != 0) {
987                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
988                                       obd->obd_name, cluuid->uuid, rc);
989                         GOTO(exit_unlock, rc = -EALREADY);
990                 }
991         }
992
993         if (!is_self) {
994                 class_incref(obd, "export", export);
995                 list_add_tail(&export->exp_obd_chain_timed,
996                               &obd->obd_exports_timed);
997                 list_add(&export->exp_obd_chain, &obd->obd_exports);
998                 obd->obd_num_exports++;
999         } else {
1000                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1001                 INIT_LIST_HEAD(&export->exp_obd_chain);
1002         }
1003         spin_unlock(&obd->obd_dev_lock);
1004         RETURN(export);
1005
1006 exit_unlock:
1007         spin_unlock(&obd->obd_dev_lock);
1008         class_handle_unhash(&export->exp_handle);
1009         obd_destroy_export(export);
1010         OBD_FREE_PTR(export);
1011         return ERR_PTR(rc);
1012 }
1013
1014 struct obd_export *class_new_export(struct obd_device *obd,
1015                                     struct obd_uuid *uuid)
1016 {
1017         return __class_new_export(obd, uuid, false);
1018 }
1019 EXPORT_SYMBOL(class_new_export);
1020
1021 struct obd_export *class_new_export_self(struct obd_device *obd,
1022                                          struct obd_uuid *uuid)
1023 {
1024         return __class_new_export(obd, uuid, true);
1025 }
1026
1027 void class_unlink_export(struct obd_export *exp)
1028 {
1029         class_handle_unhash(&exp->exp_handle);
1030
1031         if (exp->exp_obd->obd_self_export == exp) {
1032                 class_export_put(exp);
1033                 return;
1034         }
1035
1036         spin_lock(&exp->exp_obd->obd_dev_lock);
1037         /* delete an uuid-export hashitem from hashtables */
1038         if (exp != exp->exp_obd->obd_self_export)
1039                 obd_uuid_del(exp->exp_obd, exp);
1040
1041 #ifdef HAVE_SERVER_SUPPORT
1042         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1043                 struct tg_export_data   *ted = &exp->exp_target_data;
1044                 struct cfs_hash         *hash;
1045
1046                 /* Because obd_gen_hash will not be released until
1047                  * class_cleanup(), so hash should never be NULL here */
1048                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1049                 LASSERT(hash != NULL);
1050                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1051                              &exp->exp_gen_hash);
1052                 cfs_hash_putref(hash);
1053         }
1054 #endif /* HAVE_SERVER_SUPPORT */
1055
1056         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1057         list_del_init(&exp->exp_obd_chain_timed);
1058         exp->exp_obd->obd_num_exports--;
1059         spin_unlock(&exp->exp_obd->obd_dev_lock);
1060
1061         /* A reference is kept by obd_stale_exports list */
1062         obd_stale_export_put(exp);
1063 }
1064 EXPORT_SYMBOL(class_unlink_export);
1065
1066 /* Import management functions */
1067 static void obd_zombie_import_free(struct obd_import *imp)
1068 {
1069         struct obd_import_conn *imp_conn;
1070
1071         ENTRY;
1072         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1073                imp->imp_obd->obd_name);
1074
1075         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1076
1077         ptlrpc_connection_put(imp->imp_connection);
1078
1079         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1080                                                     struct obd_import_conn,
1081                                                     oic_item)) != NULL) {
1082                 list_del_init(&imp_conn->oic_item);
1083                 ptlrpc_connection_put(imp_conn->oic_conn);
1084                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1085         }
1086
1087         LASSERT(imp->imp_sec == NULL);
1088         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1089                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1090         class_decref(imp->imp_obd, "import", imp);
1091         OBD_FREE_PTR(imp);
1092         EXIT;
1093 }
1094
1095 struct obd_import *class_import_get(struct obd_import *import)
1096 {
1097         refcount_inc(&import->imp_refcount);
1098         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1099                refcount_read(&import->imp_refcount),
1100                import->imp_obd->obd_name);
1101         return import;
1102 }
1103 EXPORT_SYMBOL(class_import_get);
1104
1105 void class_import_put(struct obd_import *imp)
1106 {
1107         ENTRY;
1108
1109         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1110
1111         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1112                refcount_read(&imp->imp_refcount) - 1,
1113                imp->imp_obd->obd_name);
1114
1115         if (refcount_dec_and_test(&imp->imp_refcount)) {
1116                 CDEBUG(D_INFO, "final put import %p\n", imp);
1117                 obd_zombie_import_add(imp);
1118         }
1119
1120         EXIT;
1121 }
1122 EXPORT_SYMBOL(class_import_put);
1123
1124 static void init_imp_at(struct imp_at *at) {
1125         int i;
1126         at_init(&at->iat_net_latency, 0, 0);
1127         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1128                 /* max service estimates are tracked on the server side, so
1129                    don't use the AT history here, just use the last reported
1130                    val. (But keep hist for proc histogram, worst_ever) */
1131                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1132                         AT_FLG_NOHIST);
1133         }
1134 }
1135
1136 static void obd_zombie_imp_cull(struct work_struct *ws)
1137 {
1138         struct obd_import *import;
1139
1140         import = container_of(ws, struct obd_import, imp_zombie_work);
1141         obd_zombie_import_free(import);
1142 }
1143
1144 struct obd_import *class_new_import(struct obd_device *obd)
1145 {
1146         struct obd_import *imp;
1147         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1148
1149         OBD_ALLOC(imp, sizeof(*imp));
1150         if (imp == NULL)
1151                 return NULL;
1152
1153         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1154         INIT_LIST_HEAD(&imp->imp_replay_list);
1155         INIT_LIST_HEAD(&imp->imp_sending_list);
1156         INIT_LIST_HEAD(&imp->imp_delayed_list);
1157         INIT_LIST_HEAD(&imp->imp_committed_list);
1158         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1159         imp->imp_known_replied_xid = 0;
1160         imp->imp_replay_cursor = &imp->imp_committed_list;
1161         spin_lock_init(&imp->imp_lock);
1162         imp->imp_last_success_conn = 0;
1163         imp->imp_state = LUSTRE_IMP_NEW;
1164         imp->imp_obd = class_incref(obd, "import", imp);
1165         rwlock_init(&imp->imp_sec_lock);
1166         init_waitqueue_head(&imp->imp_recovery_waitq);
1167         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1168
1169         if (curr_pid_ns && curr_pid_ns->child_reaper)
1170                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1171         else
1172                 imp->imp_sec_refpid = 1;
1173
1174         refcount_set(&imp->imp_refcount, 2);
1175         atomic_set(&imp->imp_unregistering, 0);
1176         atomic_set(&imp->imp_reqs, 0);
1177         atomic_set(&imp->imp_inflight, 0);
1178         atomic_set(&imp->imp_replay_inflight, 0);
1179         init_waitqueue_head(&imp->imp_replay_waitq);
1180         atomic_set(&imp->imp_inval_count, 0);
1181         atomic_set(&imp->imp_waiting, 0);
1182         INIT_LIST_HEAD(&imp->imp_conn_list);
1183         init_imp_at(&imp->imp_at);
1184
1185         /* the default magic is V2, will be used in connect RPC, and
1186          * then adjusted according to the flags in request/reply. */
1187         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1188
1189         return imp;
1190 }
1191 EXPORT_SYMBOL(class_new_import);
1192
1193 void class_destroy_import(struct obd_import *import)
1194 {
1195         LASSERT(import != NULL);
1196         LASSERT(import != LP_POISON);
1197
1198         spin_lock(&import->imp_lock);
1199         import->imp_generation++;
1200         spin_unlock(&import->imp_lock);
1201         class_import_put(import);
1202 }
1203 EXPORT_SYMBOL(class_destroy_import);
1204
1205 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1206
1207 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1208 {
1209         spin_lock(&exp->exp_locks_list_guard);
1210
1211         LASSERT(lock->l_exp_refs_nr >= 0);
1212
1213         if (lock->l_exp_refs_target != NULL &&
1214             lock->l_exp_refs_target != exp) {
1215                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1216                               exp, lock, lock->l_exp_refs_target);
1217         }
1218         if ((lock->l_exp_refs_nr ++) == 0) {
1219                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1220                 lock->l_exp_refs_target = exp;
1221         }
1222         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1223                lock, exp, lock->l_exp_refs_nr);
1224         spin_unlock(&exp->exp_locks_list_guard);
1225 }
1226 EXPORT_SYMBOL(__class_export_add_lock_ref);
1227
1228 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1229 {
1230         spin_lock(&exp->exp_locks_list_guard);
1231         LASSERT(lock->l_exp_refs_nr > 0);
1232         if (lock->l_exp_refs_target != exp) {
1233                 LCONSOLE_WARN("lock %p, "
1234                               "mismatching export pointers: %p, %p\n",
1235                               lock, lock->l_exp_refs_target, exp);
1236         }
1237         if (-- lock->l_exp_refs_nr == 0) {
1238                 list_del_init(&lock->l_exp_refs_link);
1239                 lock->l_exp_refs_target = NULL;
1240         }
1241         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1242                lock, exp, lock->l_exp_refs_nr);
1243         spin_unlock(&exp->exp_locks_list_guard);
1244 }
1245 EXPORT_SYMBOL(__class_export_del_lock_ref);
1246 #endif
1247
1248 /* A connection defines an export context in which preallocation can
1249    be managed. This releases the export pointer reference, and returns
1250    the export handle, so the export refcount is 1 when this function
1251    returns. */
1252 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1253                   struct obd_uuid *cluuid)
1254 {
1255         struct obd_export *export;
1256         LASSERT(conn != NULL);
1257         LASSERT(obd != NULL);
1258         LASSERT(cluuid != NULL);
1259         ENTRY;
1260
1261         export = class_new_export(obd, cluuid);
1262         if (IS_ERR(export))
1263                 RETURN(PTR_ERR(export));
1264
1265         conn->cookie = export->exp_handle.h_cookie;
1266         class_export_put(export);
1267
1268         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1269                cluuid->uuid, conn->cookie);
1270         RETURN(0);
1271 }
1272 EXPORT_SYMBOL(class_connect);
1273
1274 /* if export is involved in recovery then clean up related things */
1275 static void class_export_recovery_cleanup(struct obd_export *exp)
1276 {
1277         struct obd_device *obd = exp->exp_obd;
1278
1279         spin_lock(&obd->obd_recovery_task_lock);
1280         if (obd->obd_recovering) {
1281                 if (exp->exp_in_recovery) {
1282                         spin_lock(&exp->exp_lock);
1283                         exp->exp_in_recovery = 0;
1284                         spin_unlock(&exp->exp_lock);
1285                         LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1286                         atomic_dec(&obd->obd_connected_clients);
1287                 }
1288
1289                 /* if called during recovery then should update
1290                  * obd_stale_clients counter,
1291                  * lightweight exports are not counted */
1292                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1293                         exp->exp_obd->obd_stale_clients++;
1294         }
1295         spin_unlock(&obd->obd_recovery_task_lock);
1296
1297         spin_lock(&exp->exp_lock);
1298         /** Cleanup req replay fields */
1299         if (exp->exp_req_replay_needed) {
1300                 exp->exp_req_replay_needed = 0;
1301
1302                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1303                 atomic_dec(&obd->obd_req_replay_clients);
1304         }
1305
1306         /** Cleanup lock replay data */
1307         if (exp->exp_lock_replay_needed) {
1308                 exp->exp_lock_replay_needed = 0;
1309
1310                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1311                 atomic_dec(&obd->obd_lock_replay_clients);
1312         }
1313         spin_unlock(&exp->exp_lock);
1314 }
1315
1316 /* This function removes 1-3 references from the export:
1317  * 1 - for export pointer passed
1318  * and if disconnect really need
1319  * 2 - removing from hash
1320  * 3 - in client_unlink_export
1321  * The export pointer passed to this function can destroyed */
1322 int class_disconnect(struct obd_export *export)
1323 {
1324         int already_disconnected;
1325         ENTRY;
1326
1327         if (export == NULL) {
1328                 CWARN("attempting to free NULL export %p\n", export);
1329                 RETURN(-EINVAL);
1330         }
1331
1332         spin_lock(&export->exp_lock);
1333         already_disconnected = export->exp_disconnected;
1334         export->exp_disconnected = 1;
1335 #ifdef HAVE_SERVER_SUPPORT
1336         /*  We hold references of export for uuid hash
1337          *  and nid_hash and export link at least. So
1338          *  it is safe to call rh*table_remove_fast in
1339          *  there.
1340          */
1341         obd_nid_del(export->exp_obd, export);
1342 #endif /* HAVE_SERVER_SUPPORT */
1343         spin_unlock(&export->exp_lock);
1344
1345         /* class_cleanup(), abort_recovery(), and class_fail_export()
1346          * all end up in here, and if any of them race we shouldn't
1347          * call extra class_export_puts(). */
1348         if (already_disconnected)
1349                 GOTO(no_disconn, already_disconnected);
1350
1351         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1352                export->exp_handle.h_cookie);
1353
1354         class_export_recovery_cleanup(export);
1355         class_unlink_export(export);
1356 no_disconn:
1357         class_export_put(export);
1358         RETURN(0);
1359 }
1360 EXPORT_SYMBOL(class_disconnect);
1361
1362 /* Return non-zero for a fully connected export */
1363 int class_connected_export(struct obd_export *exp)
1364 {
1365         int connected = 0;
1366
1367         if (exp) {
1368                 spin_lock(&exp->exp_lock);
1369                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1370                 spin_unlock(&exp->exp_lock);
1371         }
1372         return connected;
1373 }
1374 EXPORT_SYMBOL(class_connected_export);
1375
1376 static void class_disconnect_export_list(struct list_head *list,
1377                                          enum obd_option flags)
1378 {
1379         int rc;
1380         struct obd_export *exp;
1381         ENTRY;
1382
1383         /* It's possible that an export may disconnect itself, but
1384          * nothing else will be added to this list.
1385          */
1386         while ((exp = list_first_entry_or_null(list, struct obd_export,
1387                                                exp_obd_chain)) != NULL) {
1388                 /* need for safe call CDEBUG after obd_disconnect */
1389                 class_export_get(exp);
1390
1391                 spin_lock(&exp->exp_lock);
1392                 exp->exp_flags = flags;
1393                 spin_unlock(&exp->exp_lock);
1394
1395                 if (obd_uuid_equals(&exp->exp_client_uuid,
1396                                     &exp->exp_obd->obd_uuid)) {
1397                         CDEBUG(D_HA,
1398                                "exp %p export uuid == obd uuid, don't discon\n",
1399                                exp);
1400                         /* Need to delete this now so we don't end up pointing
1401                          * to work_list later when this export is cleaned up. */
1402                         list_del_init(&exp->exp_obd_chain);
1403                         class_export_put(exp);
1404                         continue;
1405                 }
1406
1407                 class_export_get(exp);
1408                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1409                        "last request at %lld\n",
1410                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1411                        exp, exp->exp_last_request_time);
1412                 /* release one export reference anyway */
1413                 rc = obd_disconnect(exp);
1414
1415                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1416                        obd_export_nid2str(exp), exp, rc);
1417                 class_export_put(exp);
1418         }
1419         EXIT;
1420 }
1421
1422 void class_disconnect_exports(struct obd_device *obd)
1423 {
1424         LIST_HEAD(work_list);
1425         ENTRY;
1426
1427         /* Move all of the exports from obd_exports to a work list, en masse. */
1428         spin_lock(&obd->obd_dev_lock);
1429         list_splice_init(&obd->obd_exports, &work_list);
1430         list_splice_init(&obd->obd_delayed_exports, &work_list);
1431         spin_unlock(&obd->obd_dev_lock);
1432
1433         if (!list_empty(&work_list)) {
1434                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1435                        "disconnecting them\n", obd->obd_minor, obd);
1436                 class_disconnect_export_list(&work_list,
1437                                              exp_flags_from_obd(obd));
1438         } else
1439                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1440                        obd->obd_minor, obd);
1441         EXIT;
1442 }
1443 EXPORT_SYMBOL(class_disconnect_exports);
1444
1445 /* Remove exports that have not completed recovery.
1446  */
1447 void class_disconnect_stale_exports(struct obd_device *obd,
1448                                     int (*test_export)(struct obd_export *))
1449 {
1450         LIST_HEAD(work_list);
1451         struct obd_export *exp, *n;
1452         int evicted = 0;
1453         ENTRY;
1454
1455         spin_lock(&obd->obd_dev_lock);
1456         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1457                                  exp_obd_chain) {
1458                 /* don't count self-export as client */
1459                 if (obd_uuid_equals(&exp->exp_client_uuid,
1460                                     &exp->exp_obd->obd_uuid))
1461                         continue;
1462
1463                 /* don't evict clients which have no slot in last_rcvd
1464                  * (e.g. lightweight connection) */
1465                 if (exp->exp_target_data.ted_lr_idx == -1)
1466                         continue;
1467
1468                 spin_lock(&exp->exp_lock);
1469                 if (exp->exp_failed || test_export(exp)) {
1470                         spin_unlock(&exp->exp_lock);
1471                         continue;
1472                 }
1473                 exp->exp_failed = 1;
1474                 atomic_inc(&exp->exp_obd->obd_eviction_count);
1475                 spin_unlock(&exp->exp_lock);
1476
1477                 list_move(&exp->exp_obd_chain, &work_list);
1478                 evicted++;
1479                 CWARN("%s: disconnect stale client %s@%s\n",
1480                       obd->obd_name, exp->exp_client_uuid.uuid,
1481                       obd_export_nid2str(exp));
1482                 print_export_data(exp, "EVICTING", 0, D_HA);
1483         }
1484         spin_unlock(&obd->obd_dev_lock);
1485
1486         if (evicted)
1487                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1488                               obd->obd_name, evicted);
1489
1490         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1491                                                  OBD_OPT_ABORT_RECOV);
1492         EXIT;
1493 }
1494 EXPORT_SYMBOL(class_disconnect_stale_exports);
1495
1496 void class_fail_export(struct obd_export *exp)
1497 {
1498         int rc, already_failed;
1499
1500         spin_lock(&exp->exp_lock);
1501         already_failed = exp->exp_failed;
1502         exp->exp_failed = 1;
1503         spin_unlock(&exp->exp_lock);
1504
1505         if (already_failed) {
1506                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1507                        exp, exp->exp_client_uuid.uuid);
1508                 return;
1509         }
1510
1511         atomic_inc(&exp->exp_obd->obd_eviction_count);
1512
1513         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1514                exp, exp->exp_client_uuid.uuid);
1515
1516         if (obd_dump_on_timeout)
1517                 libcfs_debug_dumplog();
1518
1519         /* need for safe call CDEBUG after obd_disconnect */
1520         class_export_get(exp);
1521
1522         /* Most callers into obd_disconnect are removing their own reference
1523          * (request, for example) in addition to the one from the hash table.
1524          * We don't have such a reference here, so make one. */
1525         class_export_get(exp);
1526         rc = obd_disconnect(exp);
1527         if (rc)
1528                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1529         else
1530                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1531                        exp, exp->exp_client_uuid.uuid);
1532         class_export_put(exp);
1533 }
1534 EXPORT_SYMBOL(class_fail_export);
1535
1536 #ifdef HAVE_SERVER_SUPPORT
1537
1538 static int take_first(struct obd_export *exp, void *data)
1539 {
1540         struct obd_export **expp = data;
1541
1542         if (*expp)
1543                 /* already have one */
1544                 return 0;
1545         if (exp->exp_failed)
1546                 /* Don't want this one */
1547                 return 0;
1548         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1549                 /* Cannot get a ref on this one */
1550                 return 0;
1551         *expp = exp;
1552         return 1;
1553 }
1554
1555 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1556 {
1557         struct lnet_nid nid_key;
1558         struct obd_export *doomed_exp;
1559         int exports_evicted = 0;
1560
1561         libcfs_strnid(&nid_key, nid);
1562
1563         spin_lock(&obd->obd_dev_lock);
1564         /* umount has run already, so evict thread should leave
1565          * its task to umount thread now */
1566         if (obd->obd_stopping) {
1567                 spin_unlock(&obd->obd_dev_lock);
1568                 return exports_evicted;
1569         }
1570         spin_unlock(&obd->obd_dev_lock);
1571
1572         doomed_exp = NULL;
1573         while (obd_nid_export_for_each(obd, &nid_key,
1574                                        take_first, &doomed_exp) > 0) {
1575
1576                 LASSERTF(doomed_exp != obd->obd_self_export,
1577                          "self-export is hashed by NID?\n");
1578
1579                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1580                               obd->obd_name,
1581                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1582                               obd_export_nid2str(doomed_exp));
1583
1584                 class_fail_export(doomed_exp);
1585                 class_export_put(doomed_exp);
1586                 exports_evicted++;
1587                 doomed_exp = NULL;
1588         }
1589
1590         if (!exports_evicted)
1591                 CDEBUG(D_HA,
1592                        "%s: can't disconnect NID '%s': no exports found\n",
1593                        obd->obd_name, nid);
1594         return exports_evicted;
1595 }
1596 EXPORT_SYMBOL(obd_export_evict_by_nid);
1597
1598 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1599 {
1600         struct obd_export *doomed_exp = NULL;
1601         struct obd_uuid doomed_uuid;
1602         int exports_evicted = 0;
1603
1604         spin_lock(&obd->obd_dev_lock);
1605         if (obd->obd_stopping) {
1606                 spin_unlock(&obd->obd_dev_lock);
1607                 return exports_evicted;
1608         }
1609         spin_unlock(&obd->obd_dev_lock);
1610
1611         obd_str2uuid(&doomed_uuid, uuid);
1612         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1613                 CERROR("%s: can't evict myself\n", obd->obd_name);
1614                 return exports_evicted;
1615         }
1616
1617         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1618         if (doomed_exp == NULL) {
1619                 CERROR("%s: can't disconnect %s: no exports found\n",
1620                        obd->obd_name, uuid);
1621         } else {
1622                 CWARN("%s: evicting %s at adminstrative request\n",
1623                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1624                 class_fail_export(doomed_exp);
1625                 class_export_put(doomed_exp);
1626                 obd_uuid_del(obd, doomed_exp);
1627                 exports_evicted++;
1628         }
1629
1630         return exports_evicted;
1631 }
1632 #endif /* HAVE_SERVER_SUPPORT */
1633
1634 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1635 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1636 EXPORT_SYMBOL(class_export_dump_hook);
1637 #endif
1638
1639 static void print_export_data(struct obd_export *exp, const char *status,
1640                               int locks, int debug_level)
1641 {
1642         struct ptlrpc_reply_state *rs;
1643         struct ptlrpc_reply_state *first_reply = NULL;
1644         int nreplies = 0;
1645
1646         spin_lock(&exp->exp_lock);
1647         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1648                             rs_exp_list) {
1649                 if (nreplies == 0)
1650                         first_reply = rs;
1651                 nreplies++;
1652         }
1653         spin_unlock(&exp->exp_lock);
1654
1655         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1656                "%p %s %llu stale:%d\n",
1657                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1658                obd_export_nid2str(exp),
1659                refcount_read(&exp->exp_handle.h_ref),
1660                atomic_read(&exp->exp_rpc_count),
1661                atomic_read(&exp->exp_cb_count),
1662                atomic_read(&exp->exp_locks_count),
1663                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1664                nreplies, first_reply, nreplies > 3 ? "..." : "",
1665                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1666 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1667         if (locks && class_export_dump_hook != NULL)
1668                 class_export_dump_hook(exp);
1669 #endif
1670 }
1671
1672 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1673 {
1674         struct obd_export *exp;
1675
1676         spin_lock(&obd->obd_dev_lock);
1677         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1678                 print_export_data(exp, "ACTIVE", locks, debug_level);
1679         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1680                 print_export_data(exp, "UNLINKED", locks, debug_level);
1681         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1682                 print_export_data(exp, "DELAYED", locks, debug_level);
1683         spin_unlock(&obd->obd_dev_lock);
1684 }
1685
1686 void obd_exports_barrier(struct obd_device *obd)
1687 {
1688         int waited = 2;
1689         LASSERT(list_empty(&obd->obd_exports));
1690         spin_lock(&obd->obd_dev_lock);
1691         while (!list_empty(&obd->obd_unlinked_exports)) {
1692                 spin_unlock(&obd->obd_dev_lock);
1693                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1694                 if (waited > 5 && is_power_of_2(waited)) {
1695                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1696                                       "more than %d seconds. "
1697                                       "The obd refcount = %d. Is it stuck?\n",
1698                                       obd->obd_name, waited,
1699                                       kref_read(&obd->obd_refcount));
1700                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1701                 }
1702                 waited *= 2;
1703                 spin_lock(&obd->obd_dev_lock);
1704         }
1705         spin_unlock(&obd->obd_dev_lock);
1706 }
1707 EXPORT_SYMBOL(obd_exports_barrier);
1708
1709 /**
1710  * Add export to the obd_zombe thread and notify it.
1711  */
1712 static void obd_zombie_export_add(struct obd_export *exp) {
1713         atomic_inc(&obd_stale_export_num);
1714         spin_lock(&exp->exp_obd->obd_dev_lock);
1715         LASSERT(!list_empty(&exp->exp_obd_chain));
1716         list_del_init(&exp->exp_obd_chain);
1717         spin_unlock(&exp->exp_obd->obd_dev_lock);
1718         queue_work(zombie_wq, &exp->exp_zombie_work);
1719 }
1720
1721 /**
1722  * Add import to the obd_zombe thread and notify it.
1723  */
1724 static void obd_zombie_import_add(struct obd_import *imp) {
1725         LASSERT(imp->imp_sec == NULL);
1726
1727         queue_work(zombie_wq, &imp->imp_zombie_work);
1728 }
1729
1730 /**
1731  * wait when obd_zombie import/export queues become empty
1732  */
1733 void obd_zombie_barrier(void)
1734 {
1735         wait_var_event(&obd_stale_export_num,
1736                         atomic_read(&obd_stale_export_num) == 0);
1737         flush_workqueue(zombie_wq);
1738 }
1739 EXPORT_SYMBOL(obd_zombie_barrier);
1740
1741
1742 struct obd_export *obd_stale_export_get(void)
1743 {
1744         struct obd_export *exp = NULL;
1745         ENTRY;
1746
1747         spin_lock(&obd_stale_export_lock);
1748         if (!list_empty(&obd_stale_exports)) {
1749                 exp = list_first_entry(&obd_stale_exports,
1750                                        struct obd_export, exp_stale_list);
1751                 list_del_init(&exp->exp_stale_list);
1752         }
1753         spin_unlock(&obd_stale_export_lock);
1754
1755         if (exp) {
1756                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1757                        atomic_read(&obd_stale_export_num));
1758         }
1759         RETURN(exp);
1760 }
1761 EXPORT_SYMBOL(obd_stale_export_get);
1762
1763 void obd_stale_export_put(struct obd_export *exp)
1764 {
1765         ENTRY;
1766
1767         LASSERT(list_empty(&exp->exp_stale_list));
1768         if (exp->exp_lock_hash &&
1769             atomic_read(&exp->exp_lock_hash->hs_count)) {
1770                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1771                        atomic_read(&obd_stale_export_num));
1772
1773                 spin_lock_bh(&exp->exp_bl_list_lock);
1774                 spin_lock(&obd_stale_export_lock);
1775                 /* Add to the tail if there is no blocked locks,
1776                  * to the head otherwise. */
1777                 if (list_empty(&exp->exp_bl_list))
1778                         list_add_tail(&exp->exp_stale_list,
1779                                       &obd_stale_exports);
1780                 else
1781                         list_add(&exp->exp_stale_list,
1782                                  &obd_stale_exports);
1783
1784                 spin_unlock(&obd_stale_export_lock);
1785                 spin_unlock_bh(&exp->exp_bl_list_lock);
1786         } else {
1787                 class_export_put(exp);
1788         }
1789         EXIT;
1790 }
1791 EXPORT_SYMBOL(obd_stale_export_put);
1792
1793 /**
1794  * Adjust the position of the export in the stale list,
1795  * i.e. move to the head of the list if is needed.
1796  **/
1797 void obd_stale_export_adjust(struct obd_export *exp)
1798 {
1799         LASSERT(exp != NULL);
1800         spin_lock_bh(&exp->exp_bl_list_lock);
1801         spin_lock(&obd_stale_export_lock);
1802
1803         if (!list_empty(&exp->exp_stale_list) &&
1804             !list_empty(&exp->exp_bl_list))
1805                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1806
1807         spin_unlock(&obd_stale_export_lock);
1808         spin_unlock_bh(&exp->exp_bl_list_lock);
1809 }
1810 EXPORT_SYMBOL(obd_stale_export_adjust);
1811
1812 /**
1813  * start destroy zombie import/export thread
1814  */
1815 int obd_zombie_impexp_init(void)
1816 {
1817         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1818                                            0, CFS_CPT_ANY,
1819                                            cfs_cpt_number(cfs_cpt_tab));
1820
1821         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1822 }
1823
1824 /**
1825  * stop destroy zombie import/export thread
1826  */
1827 void obd_zombie_impexp_stop(void)
1828 {
1829         destroy_workqueue(zombie_wq);
1830         LASSERT(list_empty(&obd_stale_exports));
1831 }
1832
1833 /***** Kernel-userspace comm helpers *******/
1834
1835 /* Get length of entire message, including header */
1836 int kuc_len(int payload_len)
1837 {
1838         return sizeof(struct kuc_hdr) + payload_len;
1839 }
1840 EXPORT_SYMBOL(kuc_len);
1841
1842 /* Get a pointer to kuc header, given a ptr to the payload
1843  * @param p Pointer to payload area
1844  * @returns Pointer to kuc header
1845  */
1846 struct kuc_hdr * kuc_ptr(void *p)
1847 {
1848         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1849         LASSERT(lh->kuc_magic == KUC_MAGIC);
1850         return lh;
1851 }
1852 EXPORT_SYMBOL(kuc_ptr);
1853
1854 /* Alloc space for a message, and fill in header
1855  * @return Pointer to payload area
1856  */
1857 void *kuc_alloc(int payload_len, int transport, int type)
1858 {
1859         struct kuc_hdr *lh;
1860         int len = kuc_len(payload_len);
1861
1862         OBD_ALLOC(lh, len);
1863         if (lh == NULL)
1864                 return ERR_PTR(-ENOMEM);
1865
1866         lh->kuc_magic = KUC_MAGIC;
1867         lh->kuc_transport = transport;
1868         lh->kuc_msgtype = type;
1869         lh->kuc_msglen = len;
1870
1871         return (void *)(lh + 1);
1872 }
1873 EXPORT_SYMBOL(kuc_alloc);
1874
1875 /* Takes pointer to payload area */
1876 void kuc_free(void *p, int payload_len)
1877 {
1878         struct kuc_hdr *lh = kuc_ptr(p);
1879         OBD_FREE(lh, kuc_len(payload_len));
1880 }
1881 EXPORT_SYMBOL(kuc_free);
1882
1883 struct obd_request_slot_waiter {
1884         struct list_head        orsw_entry;
1885         wait_queue_head_t       orsw_waitq;
1886         bool                    orsw_signaled;
1887 };
1888
1889 static bool obd_request_slot_avail(struct client_obd *cli,
1890                                    struct obd_request_slot_waiter *orsw)
1891 {
1892         bool avail;
1893
1894         spin_lock(&cli->cl_loi_list_lock);
1895         avail = !!list_empty(&orsw->orsw_entry);
1896         spin_unlock(&cli->cl_loi_list_lock);
1897
1898         return avail;
1899 };
1900
1901 /*
1902  * For network flow control, the RPC sponsor needs to acquire a credit
1903  * before sending the RPC. The credits count for a connection is defined
1904  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1905  * the subsequent RPC sponsors need to wait until others released their
1906  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1907  */
1908 int obd_get_request_slot(struct client_obd *cli)
1909 {
1910         struct obd_request_slot_waiter   orsw;
1911         int                              rc;
1912
1913         spin_lock(&cli->cl_loi_list_lock);
1914         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1915                 cli->cl_rpcs_in_flight++;
1916                 spin_unlock(&cli->cl_loi_list_lock);
1917                 return 0;
1918         }
1919
1920         init_waitqueue_head(&orsw.orsw_waitq);
1921         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1922         orsw.orsw_signaled = false;
1923         spin_unlock(&cli->cl_loi_list_lock);
1924
1925         rc = l_wait_event_abortable(orsw.orsw_waitq,
1926                                     obd_request_slot_avail(cli, &orsw) ||
1927                                     orsw.orsw_signaled);
1928
1929         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1930          * freed but other (such as obd_put_request_slot) is using it. */
1931         spin_lock(&cli->cl_loi_list_lock);
1932         if (rc != 0) {
1933                 if (!orsw.orsw_signaled) {
1934                         if (list_empty(&orsw.orsw_entry))
1935                                 cli->cl_rpcs_in_flight--;
1936                         else
1937                                 list_del(&orsw.orsw_entry);
1938                 }
1939                 rc = -EINTR;
1940         }
1941
1942         if (orsw.orsw_signaled) {
1943                 LASSERT(list_empty(&orsw.orsw_entry));
1944
1945                 rc = -EINTR;
1946         }
1947         spin_unlock(&cli->cl_loi_list_lock);
1948
1949         return rc;
1950 }
1951 EXPORT_SYMBOL(obd_get_request_slot);
1952
1953 void obd_put_request_slot(struct client_obd *cli)
1954 {
1955         struct obd_request_slot_waiter *orsw;
1956
1957         spin_lock(&cli->cl_loi_list_lock);
1958         cli->cl_rpcs_in_flight--;
1959
1960         /* If there is free slot, wakeup the first waiter. */
1961         if (!list_empty(&cli->cl_flight_waiters) &&
1962             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
1963                 orsw = list_first_entry(&cli->cl_flight_waiters,
1964                                         struct obd_request_slot_waiter,
1965                                         orsw_entry);
1966                 list_del_init(&orsw->orsw_entry);
1967                 cli->cl_rpcs_in_flight++;
1968                 wake_up(&orsw->orsw_waitq);
1969         }
1970         spin_unlock(&cli->cl_loi_list_lock);
1971 }
1972 EXPORT_SYMBOL(obd_put_request_slot);
1973
1974 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1975 {
1976         return cli->cl_max_rpcs_in_flight;
1977 }
1978 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1979
1980 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1981 {
1982         struct obd_request_slot_waiter *orsw;
1983         __u32                           old;
1984         int                             diff;
1985         int                             i;
1986         int                             rc;
1987
1988         if (max > OBD_MAX_RIF_MAX || max < 1)
1989                 return -ERANGE;
1990
1991         CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
1992                cli->cl_import->imp_obd->obd_name, max,
1993                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
1994
1995         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
1996                    LUSTRE_MDC_NAME) == 0) {
1997                 /* adjust max_mod_rpcs_in_flight to ensure it is always
1998                  * strictly lower that max_rpcs_in_flight */
1999                 if (max < 2) {
2000                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2001                                cli->cl_import->imp_obd->obd_name);
2002                         return -ERANGE;
2003                 }
2004                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2005                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2006                         if (rc != 0)
2007                                 return rc;
2008                 }
2009         }
2010
2011         spin_lock(&cli->cl_loi_list_lock);
2012         old = cli->cl_max_rpcs_in_flight;
2013         cli->cl_max_rpcs_in_flight = max;
2014         client_adjust_max_dirty(cli);
2015
2016         diff = max - old;
2017
2018         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2019         for (i = 0; i < diff; i++) {
2020                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2021                                                 struct obd_request_slot_waiter,
2022                                                 orsw_entry);
2023                 if (!orsw)
2024                         break;
2025
2026                 list_del_init(&orsw->orsw_entry);
2027                 cli->cl_rpcs_in_flight++;
2028                 wake_up(&orsw->orsw_waitq);
2029         }
2030         spin_unlock(&cli->cl_loi_list_lock);
2031
2032         return 0;
2033 }
2034 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2035
2036 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2037 {
2038         return cli->cl_max_mod_rpcs_in_flight;
2039 }
2040 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2041
2042 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2043 {
2044         struct obd_connect_data *ocd;
2045         __u16 maxmodrpcs;
2046         __u16 prev;
2047
2048         if (max > OBD_MAX_RIF_MAX || max < 1)
2049                 return -ERANGE;
2050
2051         ocd = &cli->cl_import->imp_connect_data;
2052         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2053                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2054                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2055
2056         if (max == OBD_MAX_RIF_MAX)
2057                 max = OBD_MAX_RIF_MAX - 1;
2058
2059         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2060          * increase this value, also bump up max_rpcs_in_flight to match.
2061          */
2062         if (max >= cli->cl_max_rpcs_in_flight) {
2063                 CDEBUG(D_INFO,
2064                        "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2065                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2066                 obd_set_max_rpcs_in_flight(cli, max + 1);
2067         }
2068
2069         /* cannot exceed max modify RPCs in flight supported by the server,
2070          * but verify ocd_connect_flags is at least initialized first.  If
2071          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2072          */
2073         if (!ocd->ocd_connect_flags) {
2074                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2075         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2076                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2077                 if (maxmodrpcs == 0) { /* connection not finished yet */
2078                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2079                         CDEBUG(D_INFO,
2080                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2081                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2082                 }
2083         } else {
2084                 maxmodrpcs = 1;
2085         }
2086         if (max > maxmodrpcs) {
2087                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2088                        cli->cl_import->imp_obd->obd_name,
2089                        max, maxmodrpcs);
2090                 return -ERANGE;
2091         }
2092
2093         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2094
2095         prev = cli->cl_max_mod_rpcs_in_flight;
2096         cli->cl_max_mod_rpcs_in_flight = max;
2097
2098         /* wakeup waiters if limit has been increased */
2099         if (cli->cl_max_mod_rpcs_in_flight > prev)
2100                 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2101
2102         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2103
2104         return 0;
2105 }
2106 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2107
2108 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2109                                struct seq_file *seq)
2110 {
2111         unsigned long mod_tot = 0, mod_cum;
2112         int i;
2113
2114         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2115         lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2116                              ":", true, "");
2117         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2118                    cli->cl_mod_rpcs_in_flight);
2119
2120         seq_printf(seq, "\n\t\t\tmodify\n");
2121         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2122
2123         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2124
2125         mod_cum = 0;
2126         for (i = 0; i < OBD_HIST_MAX; i++) {
2127                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2128
2129                 mod_cum += mod;
2130                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2131                            i, mod, pct(mod, mod_tot),
2132                            pct(mod_cum, mod_tot));
2133                 if (mod_cum == mod_tot)
2134                         break;
2135         }
2136
2137         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2138
2139         return 0;
2140 }
2141 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2142
2143 /* The number of modify RPCs sent in parallel is limited
2144  * because the server has a finite number of slots per client to
2145  * store request result and ensure reply reconstruction when needed.
2146  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2147  * that takes into account server limit and cl_max_rpcs_in_flight
2148  * value.
2149  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2150  * one close request is allowed above the maximum.
2151  */
2152 struct mod_waiter {
2153         struct client_obd *cli;
2154         bool close_req;
2155         bool woken;
2156         wait_queue_entry_t wqe;
2157 };
2158 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2159                                   unsigned int mode, int flags, void *key)
2160 {
2161         struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2162         struct client_obd *cli = w->cli;
2163         bool close_req = w->close_req;
2164         bool avail;
2165         int ret;
2166
2167         /* As woken_wake_function() doesn't remove us from the wait_queue,
2168          * we use own flag to ensure we're called just once.
2169          */
2170         if (w->woken)
2171                 return 0;
2172
2173         /* A slot is available if
2174          * - number of modify RPCs in flight is less than the max
2175          * - it's a close RPC and no other close request is in flight
2176          */
2177         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2178                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2179         if (avail) {
2180                 cli->cl_mod_rpcs_in_flight++;
2181                 if (w->close_req)
2182                         cli->cl_close_rpcs_in_flight++;
2183                 ret = woken_wake_function(wq_entry, mode, flags, key);
2184                 w->woken = true;
2185         } else if (cli->cl_close_rpcs_in_flight)
2186                 /* No other waiter could be woken */
2187                 ret = -1;
2188         else if (key == NULL)
2189                 /* This was not a wakeup from a close completion, so there is no
2190                  * point seeing if there are close waiters to be woken
2191                  */
2192                 ret = -1;
2193         else
2194                 /* There might be be a close we could wake, keep looking */
2195                 ret = 0;
2196         return ret;
2197 }
2198
2199 /* Get a modify RPC slot from the obd client @cli according
2200  * to the kind of operation @opc that is going to be sent
2201  * and the intent @it of the operation if it applies.
2202  * If the maximum number of modify RPCs in flight is reached
2203  * the thread is put to sleep.
2204  * Returns the tag to be set in the request message. Tag 0
2205  * is reserved for non-modifying requests.
2206  */
2207 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2208 {
2209         struct mod_waiter wait = {
2210                 .cli = cli,
2211                 .close_req = (opc == MDS_CLOSE),
2212                 .woken = false,
2213         };
2214         __u16                   i, max;
2215
2216         init_wait(&wait.wqe);
2217         wait.wqe.func = claim_mod_rpc_function;
2218
2219         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2220         __add_wait_queue_entry_tail(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2221         /* This wakeup will only succeed if the maximums haven't
2222          * been reached.  If that happens, wait.woken will be set
2223          * and there will be no need to wait.
2224          */
2225         wake_up_locked(&cli->cl_mod_rpcs_waitq);
2226         while (wait.woken == false) {
2227                 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2228                 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2229                            MAX_SCHEDULE_TIMEOUT);
2230                 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2231         }
2232         __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2233
2234         max = cli->cl_max_mod_rpcs_in_flight;
2235         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2236                          cli->cl_mod_rpcs_in_flight);
2237         /* find a free tag */
2238         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2239                                 max + 1);
2240         LASSERT(i < OBD_MAX_RIF_MAX);
2241         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2242         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2243         /* tag 0 is reserved for non-modify RPCs */
2244
2245         CDEBUG(D_RPCTRACE,
2246                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2247                cli->cl_import->imp_obd->obd_name,
2248                i + 1, opc, max);
2249
2250         return i + 1;
2251 }
2252 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2253
2254 /* Put a modify RPC slot from the obd client @cli according
2255  * to the kind of operation @opc that has been sent.
2256  */
2257 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2258 {
2259         bool                    close_req = false;
2260
2261         if (tag == 0)
2262                 return;
2263
2264         if (opc == MDS_CLOSE)
2265                 close_req = true;
2266
2267         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2268         cli->cl_mod_rpcs_in_flight--;
2269         if (close_req)
2270                 cli->cl_close_rpcs_in_flight--;
2271         /* release the tag in the bitmap */
2272         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2273         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2274         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2275                              (void *)close_req);
2276         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2277 }
2278 EXPORT_SYMBOL(obd_put_mod_rpc_slot);