Whamcloud - gitweb
0118971414c0e3514f85e98ab6e52dd36817e9df
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_XARRAY_ALLOC(obd_devs);
49 EXPORT_SYMBOL(obd_devs);
50
51 static atomic_t obd_devs_count = ATOMIC_INIT(0);
52
53 static struct kmem_cache *obd_device_cachep;
54 static struct kobj_type class_ktype;
55 static struct workqueue_struct *zombie_wq;
56
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks, int debug_level);
61
62 static LIST_HEAD(obd_stale_exports);
63 static DEFINE_SPINLOCK(obd_stale_export_lock);
64 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65
66 static struct obd_device *obd_device_alloc(void)
67 {
68         struct obd_device *obd;
69
70         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
71         if (obd != NULL)
72                 obd->obd_magic = OBD_DEVICE_MAGIC;
73         return obd;
74 }
75
76 static void obd_device_free(struct obd_device *obd)
77 {
78         LASSERT(obd != NULL);
79         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
80                  "obd %px obd_magic %08x != %08x\n",
81                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
82         if (obd->obd_namespace != NULL) {
83                 CERROR("obd %px: namespace %px was not properly cleaned up (obd_force=%d)!\n",
84                        obd, obd->obd_namespace, obd->obd_force);
85                 LBUG();
86         }
87         lu_ref_fini(&obd->obd_reference);
88         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
89 }
90
91 struct obd_type *class_search_type(const char *name)
92 {
93         struct kobject *kobj = kset_find_obj(lustre_kset, name);
94
95         if (kobj && kobj->ktype == &class_ktype)
96                 return container_of(kobj, struct obd_type, typ_kobj);
97
98         kobject_put(kobj);
99         return NULL;
100 }
101 EXPORT_SYMBOL(class_search_type);
102
103 struct obd_type *class_get_type(const char *name)
104 {
105         struct obd_type *type;
106
107         type = class_search_type(name);
108 #ifdef HAVE_MODULE_LOADING_SUPPORT
109         if (!type) {
110                 const char *modname = name;
111
112 #ifdef HAVE_SERVER_SUPPORT
113                 if (strcmp(modname, "obdfilter") == 0)
114                         modname = "ofd";
115
116                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
117                         modname = LUSTRE_OSP_NAME;
118
119                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
120                         modname = LUSTRE_MDT_NAME;
121 #endif /* HAVE_SERVER_SUPPORT */
122
123                 if (!request_module("%s", modname)) {
124                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125                         type = class_search_type(name);
126                 } else {
127                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
128                                            modname);
129                 }
130         }
131 #endif
132         if (type) {
133                 if (try_module_get(type->typ_dt_ops->o_owner)) {
134                         atomic_inc(&type->typ_refcnt);
135                         /* class_search_type() returned a counted ref, this
136                          * count not needed as we could get it via typ_refcnt
137                          */
138                         kobject_put(&type->typ_kobj);
139                 } else {
140                         kobject_put(&type->typ_kobj);
141                         type = NULL;
142                 }
143         }
144         return type;
145 }
146 EXPORT_SYMBOL(class_get_type);
147
148 void class_put_type(struct obd_type *type)
149 {
150         LASSERT(type);
151         module_put(type->typ_dt_ops->o_owner);
152         atomic_dec(&type->typ_refcnt);
153 }
154 EXPORT_SYMBOL(class_put_type);
155
156 static void class_sysfs_release(struct kobject *kobj)
157 {
158         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
159
160         debugfs_remove_recursive(type->typ_debugfs_entry);
161         type->typ_debugfs_entry = NULL;
162
163         if (type->typ_lu)
164                 lu_device_type_fini(type->typ_lu);
165
166 #ifdef CONFIG_PROC_FS
167         if (type->typ_name && type->typ_procroot)
168                 remove_proc_subtree(type->typ_name, proc_lustre_root);
169 #endif
170         OBD_FREE(type, sizeof(*type));
171 }
172
173 static struct kobj_type class_ktype = {
174         .sysfs_ops      = &lustre_sysfs_ops,
175         .release        = class_sysfs_release,
176 };
177
178 #ifdef HAVE_SERVER_SUPPORT
179 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
180 {
181         struct dentry *symlink;
182         struct obd_type *type;
183         int rc;
184
185         type = class_search_type(name);
186         if (type) {
187                 kobject_put(&type->typ_kobj);
188                 return ERR_PTR(-EEXIST);
189         }
190
191         OBD_ALLOC(type, sizeof(*type));
192         if (!type)
193                 return ERR_PTR(-ENOMEM);
194
195         type->typ_kobj.kset = lustre_kset;
196         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
197                                   &lustre_kset->kobj, "%s", name);
198         if (rc)
199                 return ERR_PTR(rc);
200
201         symlink = debugfs_create_dir(name, debugfs_lustre_root);
202         type->typ_debugfs_entry = symlink;
203         type->typ_sym_filter = true;
204
205         if (enable_proc) {
206                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
207                                                       NULL, NULL);
208                 if (IS_ERR(type->typ_procroot)) {
209                         CERROR("%s: can't create compat proc entry: %d\n",
210                                name, (int)PTR_ERR(type->typ_procroot));
211                         type->typ_procroot = NULL;
212                 }
213         }
214
215         return type;
216 }
217 EXPORT_SYMBOL(class_add_symlinks);
218 #endif /* HAVE_SERVER_SUPPORT */
219
220 #define CLASS_MAX_NAME 1024
221
222 int class_register_type(const struct obd_ops *dt_ops,
223                         const struct md_ops *md_ops,
224                         bool enable_proc,
225                         const char *name, struct lu_device_type *ldt)
226 {
227         struct obd_type *type;
228         int rc;
229
230         ENTRY;
231         /* sanity check */
232         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
233
234         type = class_search_type(name);
235         if (type) {
236 #ifdef HAVE_SERVER_SUPPORT
237                 if (type->typ_sym_filter)
238                         goto dir_exist;
239 #endif /* HAVE_SERVER_SUPPORT */
240                 kobject_put(&type->typ_kobj);
241                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
242                 RETURN(-EEXIST);
243         }
244
245         OBD_ALLOC(type, sizeof(*type));
246         if (type == NULL)
247                 RETURN(-ENOMEM);
248
249         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
250         type->typ_kobj.kset = lustre_kset;
251         kobject_init(&type->typ_kobj, &class_ktype);
252 #ifdef HAVE_SERVER_SUPPORT
253 dir_exist:
254 #endif /* HAVE_SERVER_SUPPORT */
255
256         type->typ_dt_ops = dt_ops;
257         type->typ_md_ops = md_ops;
258
259 #ifdef HAVE_SERVER_SUPPORT
260         if (type->typ_sym_filter) {
261                 type->typ_sym_filter = false;
262                 kobject_put(&type->typ_kobj);
263                 goto setup_ldt;
264         }
265 #endif
266 #ifdef CONFIG_PROC_FS
267         if (enable_proc && !type->typ_procroot) {
268                 type->typ_procroot = lprocfs_register(name,
269                                                       proc_lustre_root,
270                                                       NULL, type);
271                 if (IS_ERR(type->typ_procroot)) {
272                         rc = PTR_ERR(type->typ_procroot);
273                         type->typ_procroot = NULL;
274                         GOTO(failed, rc);
275                 }
276         }
277 #endif
278         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
279
280         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
281         if (rc)
282                 GOTO(failed, rc);
283 #ifdef HAVE_SERVER_SUPPORT
284 setup_ldt:
285 #endif
286         if (ldt) {
287                 rc = lu_device_type_init(ldt);
288                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
289                 wake_up_var(&type->typ_lu);
290                 if (rc)
291                         GOTO(failed, rc);
292         }
293
294         RETURN(0);
295
296 failed:
297         kobject_put(&type->typ_kobj);
298
299         RETURN(rc);
300 }
301 EXPORT_SYMBOL(class_register_type);
302
303 int class_unregister_type(const char *name)
304 {
305         struct obd_type *type = class_search_type(name);
306         int rc = 0;
307
308         ENTRY;
309
310         if (!type) {
311                 CERROR("unknown obd type\n");
312                 RETURN(-EINVAL);
313         }
314
315         if (atomic_read(&type->typ_refcnt)) {
316                 CERROR("type %s has refcount (%d)\n", name,
317                        atomic_read(&type->typ_refcnt));
318                 /* This is a bad situation, let's make the best of it */
319                 /* Remove ops, but leave the name for debugging */
320                 type->typ_dt_ops = NULL;
321                 type->typ_md_ops = NULL;
322                 GOTO(out_put, rc = -EBUSY);
323         }
324
325         /* Put the final ref */
326         kobject_put(&type->typ_kobj);
327 out_put:
328         /* Put the ref returned by class_search_type() */
329         kobject_put(&type->typ_kobj);
330
331         RETURN(rc);
332 } /* class_unregister_type */
333 EXPORT_SYMBOL(class_unregister_type);
334
335 /**
336  * Create a new obd device.
337  *
338  * Allocate the new obd_device and initialize it.
339  *
340  * \param[in] type_name obd device type string.
341  * \param[in] name      obd device name.
342  * \param[in] uuid      obd device UUID
343  *
344  * \retval newdev         pointer to created obd_device
345  * \retval ERR_PTR(errno) on error
346  */
347 struct obd_device *class_newdev(const char *type_name, const char *name,
348                                 const char *uuid)
349 {
350         struct obd_device *newdev;
351         struct obd_type *type = NULL;
352
353         ENTRY;
354
355         if (strlen(name) >= MAX_OBD_NAME) {
356                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
357                 RETURN(ERR_PTR(-EINVAL));
358         }
359
360         type = class_get_type(type_name);
361         if (type == NULL) {
362                 CERROR("OBD: unknown type: %s\n", type_name);
363                 RETURN(ERR_PTR(-ENODEV));
364         }
365
366         newdev = obd_device_alloc();
367         if (newdev == NULL) {
368                 class_put_type(type);
369                 RETURN(ERR_PTR(-ENOMEM));
370         }
371         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
372         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
373         newdev->obd_type = type;
374         newdev->obd_minor = -1;
375
376         rwlock_init(&newdev->obd_pool_lock);
377         newdev->obd_pool_limit = 0;
378         newdev->obd_pool_slv = 0;
379
380         INIT_LIST_HEAD(&newdev->obd_exports);
381         newdev->obd_num_exports = 0;
382         newdev->obd_grant_check_threshold = 100;
383         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
384         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
385         INIT_LIST_HEAD(&newdev->obd_exports_timed);
386         INIT_LIST_HEAD(&newdev->obd_nid_stats);
387         spin_lock_init(&newdev->obd_nid_lock);
388         spin_lock_init(&newdev->obd_dev_lock);
389         mutex_init(&newdev->obd_dev_mutex);
390         spin_lock_init(&newdev->obd_osfs_lock);
391         /* newdev->obd_osfs_age must be set to a value in the distant
392          * past to guarantee a fresh statfs is fetched on mount.
393          */
394         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
395
396         /* XXX belongs in setup not attach  */
397         init_rwsem(&newdev->obd_observer_link_sem);
398         /* recovery data */
399         spin_lock_init(&newdev->obd_recovery_task_lock);
400         init_waitqueue_head(&newdev->obd_next_transno_waitq);
401         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
402         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
403         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
404         INIT_LIST_HEAD(&newdev->obd_evict_list);
405         INIT_LIST_HEAD(&newdev->obd_lwp_list);
406
407         llog_group_init(&newdev->obd_olg);
408         /* Detach drops this */
409         kref_init(&newdev->obd_refcount);
410         lu_ref_init(&newdev->obd_reference);
411         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
412
413         newdev->obd_conn_inprogress = 0;
414
415         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
416
417         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
418                newdev->obd_name, newdev);
419
420         return newdev;
421 }
422
423 /**
424  * Free obd device.
425  *
426  * \param[in] obd obd_device to be freed
427  *
428  * \retval none
429  */
430 void class_free_dev(struct obd_device *obd)
431 {
432         struct obd_type *obd_type = obd->obd_type;
433
434         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
435                  "%px obd_magic %08x != %08x\n",
436                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
437         LASSERTF(obd->obd_minor == -1 || class_num2obd(obd->obd_minor) == obd,
438                  "obd %px != obd_devs[%d] %px\n",
439                  obd, obd->obd_minor, class_num2obd(obd->obd_minor));
440         LASSERTF(kref_read(&obd->obd_refcount) == 0,
441                  "obd_refcount should be 0, not %d\n",
442                  kref_read(&obd->obd_refcount));
443         LASSERT(obd_type != NULL);
444
445         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
446                obd->obd_name, obd->obd_type->typ_name);
447
448         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
449                          obd->obd_name, obd->obd_uuid.uuid);
450         if (obd->obd_stopping) {
451                 int err;
452
453                 /* If we're not stopping, we were never set up */
454                 err = obd_cleanup(obd);
455                 if (err)
456                         CERROR("Cleanup %s returned %d\n",
457                                 obd->obd_name, err);
458         }
459
460         obd_device_free(obd);
461
462         class_put_type(obd_type);
463 }
464
465 /**
466  * Unregister obd device.
467  *
468  * Remove an obd from obd_dev
469  *
470  * \param[in] new_obd obd_device to be unregistered
471  *
472  * \retval none
473  */
474 void class_unregister_device(struct obd_device *obd)
475 {
476         if (obd->obd_minor >= 0) {
477                 xa_erase(&obd_devs, obd->obd_minor);
478                 class_decref(obd, "obd_device_list", obd);
479                 obd->obd_minor = -1;
480                 atomic_dec(&obd_devs_count);
481         }
482 }
483
484 /**
485  * Register obd device.
486  *
487  * Add new_obd to obd_devs
488  *
489  * \param[in] new_obd obd_device to be registered
490  *
491  * \retval 0          success
492  * \retval -EEXIST    device with this name is registered
493  */
494 int class_register_device(struct obd_device *new_obd)
495 {
496         int rc = 0;
497         int dev_no = 0;
498
499         if (new_obd == NULL) {
500                 rc = -1;
501                 goto out;
502         }
503
504         /* obd_device waiting to be destroyed by "obd_zombie_impexp_thread" */
505         if (class_name2dev(new_obd->obd_name) != -1)
506                 obd_zombie_barrier();
507
508         if (class_name2dev(new_obd->obd_name) == -1) {
509                 class_incref(new_obd, "obd_device_list", new_obd);
510                 rc = xa_alloc(&obd_devs, &dev_no, new_obd,
511                               xa_limit_31b, GFP_ATOMIC);
512
513                 if (rc != 0)
514                         goto out;
515
516                 new_obd->obd_minor = dev_no;
517                 atomic_inc(&obd_devs_count);
518         } else {
519                 rc = -EEXIST;
520         }
521
522 out:
523         RETURN(rc);
524 }
525
526 int class_name2dev(const char *name)
527 {
528         struct obd_device *obd = NULL;
529         unsigned long dev_no = 0;
530         int ret;
531
532         if (!name)
533                 return -1;
534
535         obd_device_lock();
536         obd_device_for_each(dev_no, obd) {
537                 if (strcmp(name, obd->obd_name) == 0) {
538                         /*
539                          * Make sure we finished attaching before we give
540                          * out any references
541                          */
542                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
543                         if (obd->obd_attached) {
544                                 ret = obd->obd_minor;
545                                 obd_device_unlock();
546                                 return ret;
547                         }
548                         break;
549                 }
550         }
551         obd_device_unlock();
552
553         return -1;
554 }
555 EXPORT_SYMBOL(class_name2dev);
556
557 struct obd_device *class_name2obd(const char *name)
558 {
559         struct obd_device *obd = NULL;
560         unsigned long dev_no = 0;
561
562         if (!name)
563                 return NULL;
564
565         obd_device_lock();
566         obd_device_for_each(dev_no, obd) {
567                 if (strcmp(name, obd->obd_name) == 0) {
568                         /*
569                          * Make sure we finished attaching before we give
570                          * out any references
571                          */
572                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
573                         if (obd->obd_attached)
574                                 break;
575                 }
576         }
577         obd_device_unlock();
578
579         /*
580          * TODO: We give out a reference without class_incref(). This isn't
581          * ideal, but this behavior is identical in previous implementations
582          * of this function.
583          */
584         return obd;
585 }
586 EXPORT_SYMBOL(class_name2obd);
587
588 int class_uuid2dev(struct obd_uuid *uuid)
589 {
590         struct obd_device *obd = NULL;
591         unsigned long dev_no = 0;
592         int ret;
593
594         obd_device_lock();
595         obd_device_for_each(dev_no, obd) {
596                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
597                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
598                         ret = obd->obd_minor;
599                         obd_device_unlock();
600                         return ret;
601                 }
602         }
603         obd_device_unlock();
604
605         return -1;
606 }
607 EXPORT_SYMBOL(class_uuid2dev);
608
609 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
610 {
611         struct obd_device *obd = NULL;
612         unsigned long dev_no = 0;
613
614         obd_device_lock();
615         obd_device_for_each(dev_no, obd) {
616                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
617                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
618                         break;
619                 }
620         }
621         obd_device_unlock();
622
623         /*
624          * TODO: We give out a reference without class_incref(). This isn't
625          * ideal, but this behavior is identical in previous implementations
626          * of this function.
627          */
628         return obd;
629 }
630 EXPORT_SYMBOL(class_uuid2obd);
631
632 struct obd_device *class_num2obd(int dev_no)
633 {
634         return xa_load(&obd_devs, dev_no);
635 }
636 EXPORT_SYMBOL(class_num2obd);
637
638 /**
639  * Find obd by name or uuid.
640  *
641  * Increment obd's refcount if found.
642  *
643  * \param[in] str obd name or uuid
644  *
645  * \retval NULL    if not found
646  * \retval obd     pointer to found obd_device
647  */
648 struct obd_device *class_str2obd(const char *str)
649 {
650         struct obd_device *obd = NULL;
651         struct obd_uuid uuid;
652         unsigned long dev_no = 0;
653
654         obd_str2uuid(&uuid, str);
655
656         obd_device_lock();
657         obd_device_for_each(dev_no, obd) {
658                 if (obd_uuid_equals(&uuid, &obd->obd_uuid) ||
659                     (strcmp(str, obd->obd_name) == 0)) {
660                         /*
661                          * Make sure we finished attaching before we give
662                          * out any references
663                          */
664                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
665                         if (obd->obd_attached) {
666                                 class_incref(obd, "find", current);
667                                 break;
668                         }
669                         obd_device_unlock();
670                         RETURN(NULL);
671                 }
672         }
673         obd_device_unlock();
674
675         RETURN(obd);
676 }
677 EXPORT_SYMBOL(class_str2obd);
678
679 /**
680  * Get obd devices count. Device in any
681  *    state are counted
682  * \retval obd device count
683  */
684 int class_obd_devs_count(void)
685 {
686         return atomic_read(&obd_devs_count);
687 }
688 EXPORT_SYMBOL(class_obd_devs_count);
689
690 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
691  * specified, then only the client with that uuid is returned,
692  * otherwise any client connected to the tgt is returned.
693  */
694 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
695                                          const char *type_name,
696                                          struct obd_uuid *grp_uuid)
697 {
698         struct obd_device *obd = NULL;
699         unsigned long dev_no = 0;
700
701         obd_device_lock();
702         obd_device_for_each(dev_no, obd) {
703                 if ((strncmp(obd->obd_type->typ_name, type_name,
704                              strlen(type_name)) == 0)) {
705                         if (obd_uuid_equals(tgt_uuid,
706                                             &obd->u.cli.cl_target_uuid) &&
707                             ((grp_uuid) ? obd_uuid_equals(grp_uuid,
708                                                          &obd->obd_uuid) : 1)) {
709                                 obd_device_unlock();
710                                 return obd;
711                         }
712                 }
713         }
714         obd_device_unlock();
715
716         return NULL;
717 }
718 EXPORT_SYMBOL(class_find_client_obd);
719
720 /**
721  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
722  * adjust sptlrpc settings accordingly.
723  */
724 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
725 {
726         struct obd_device *obd = NULL;
727         unsigned long dev_no = 0;
728         const char *type;
729         int rc = 0, rc2;
730
731         LASSERT(namelen > 0);
732
733         obd_device_lock();
734         obd_device_for_each(dev_no, obd) {
735                 if (obd->obd_set_up == 0 || obd->obd_stopping)
736                         continue;
737
738                 /* only notify mdc, osc, osp, lwp, mdt, ost
739                  * because only these have a -sptlrpc llog
740                  */
741                 type = obd->obd_type->typ_name;
742                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
743                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
744                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
745                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
746                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
747                     strcmp(type, LUSTRE_OST_NAME) != 0)
748                         continue;
749
750                 if (strncmp(obd->obd_name, fsname, namelen))
751                         continue;
752
753                 class_incref(obd, __func__, obd);
754                 obd_device_unlock();
755                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
756                                          sizeof(KEY_SPTLRPC_CONF),
757                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
758                 rc = rc ? rc : rc2;
759                 obd_device_lock();
760                 class_decref(obd, __func__, obd);
761         }
762         obd_device_unlock();
763
764         return rc;
765 }
766 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
767
768 void obd_cleanup_caches(void)
769 {
770         ENTRY;
771         if (obd_device_cachep) {
772                 kmem_cache_destroy(obd_device_cachep);
773                 obd_device_cachep = NULL;
774         }
775
776         EXIT;
777 }
778
779 int obd_init_caches(void)
780 {
781         int rc;
782
783         ENTRY;
784
785         LASSERT(obd_device_cachep == NULL);
786         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
787                                 sizeof(struct obd_device),
788                                 0, 0, 0, sizeof(struct obd_device), NULL);
789         if (!obd_device_cachep)
790                 GOTO(out, rc = -ENOMEM);
791
792         RETURN(0);
793 out:
794         obd_cleanup_caches();
795         RETURN(rc);
796 }
797
798 static const char export_handle_owner[] = "export";
799
800 /* map connection to client */
801 struct obd_export *class_conn2export(struct lustre_handle *conn)
802 {
803         struct obd_export *export;
804
805         ENTRY;
806
807         if (!conn) {
808                 CDEBUG(D_CACHE, "looking for null handle\n");
809                 RETURN(NULL);
810         }
811
812         if (conn->cookie == -1) {  /* this means assign a new connection */
813                 CDEBUG(D_CACHE, "want a new connection\n");
814                 RETURN(NULL);
815         }
816
817         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
818         export = class_handle2object(conn->cookie, export_handle_owner);
819         RETURN(export);
820 }
821 EXPORT_SYMBOL(class_conn2export);
822
823 struct obd_device *class_exp2obd(struct obd_export *exp)
824 {
825         if (exp)
826                 return exp->exp_obd;
827         return NULL;
828 }
829 EXPORT_SYMBOL(class_exp2obd);
830
831 struct obd_import *class_exp2cliimp(struct obd_export *exp)
832 {
833         struct obd_device *obd = exp->exp_obd;
834
835         if (obd == NULL)
836                 return NULL;
837         return obd->u.cli.cl_import;
838 }
839 EXPORT_SYMBOL(class_exp2cliimp);
840
841 /* Export management functions */
842 static void class_export_destroy(struct obd_export *exp)
843 {
844         struct obd_device *obd = exp->exp_obd;
845
846         ENTRY;
847
848         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
849         LASSERT(obd != NULL);
850
851         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
852                exp->exp_client_uuid.uuid, obd->obd_name);
853
854         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
855         ptlrpc_connection_put(exp->exp_connection);
856
857         LASSERT(list_empty(&exp->exp_outstanding_replies));
858         LASSERT(list_empty(&exp->exp_uncommitted_replies));
859         LASSERT(list_empty(&exp->exp_req_replay_queue));
860         LASSERT(list_empty(&exp->exp_hp_rpcs));
861         obd_destroy_export(exp);
862         /* self export doesn't hold a reference to an obd, although it
863          * exists until freeing of the obd
864          */
865         if (exp != obd->obd_self_export)
866                 class_decref(obd, "export", exp);
867
868         OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
869         kfree_rcu(exp, exp_handle.h_rcu);
870         EXIT;
871 }
872
873 struct obd_export *class_export_get(struct obd_export *exp)
874 {
875         refcount_inc(&exp->exp_handle.h_ref);
876         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
877                refcount_read(&exp->exp_handle.h_ref));
878         return exp;
879 }
880 EXPORT_SYMBOL(class_export_get);
881
882 void class_export_put(struct obd_export *exp)
883 {
884         LASSERT(exp != NULL);
885         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
886         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
887         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
888                refcount_read(&exp->exp_handle.h_ref) - 1);
889
890         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
891                 struct obd_device *obd = exp->exp_obd;
892
893                 CDEBUG(D_IOCTL, "final put %p/%s\n",
894                        exp, exp->exp_client_uuid.uuid);
895
896                 /* release nid stat refererence */
897                 lprocfs_exp_cleanup(exp);
898
899                 if (exp == obd->obd_self_export) {
900                         /* self export should be destroyed without zombie
901                          * thread as it doesn't hold a reference to obd and
902                          * doesn't hold any resources
903                          */
904                         class_export_destroy(exp);
905                         /* self export is destroyed, no class ref exist and it
906                          * is safe to free obd
907                          */
908                         class_free_dev(obd);
909                 } else {
910                         LASSERT(!list_empty(&exp->exp_obd_chain));
911                         obd_zombie_export_add(exp);
912                 }
913
914         }
915 }
916 EXPORT_SYMBOL(class_export_put);
917
918 static void obd_zombie_exp_cull(struct work_struct *ws)
919 {
920         struct obd_export *export;
921
922         export = container_of(ws, struct obd_export, exp_zombie_work);
923         class_export_destroy(export);
924         LASSERT(atomic_read(&obd_stale_export_num) > 0);
925         if (atomic_dec_and_test(&obd_stale_export_num))
926                 wake_up_var(&obd_stale_export_num);
927 }
928
929 /* Creates a new export, adds it to the hash table, and returns a
930  * pointer to it. The refcount is 2: one for the hash reference, and
931  * one for the pointer returned by this function.
932  */
933 static struct obd_export *__class_new_export(struct obd_device *obd,
934                                              struct obd_uuid *cluuid,
935                                              bool is_self)
936 {
937         struct obd_export *export;
938         int rc = 0;
939
940         ENTRY;
941
942         OBD_ALLOC_PTR(export);
943         if (!export)
944                 return ERR_PTR(-ENOMEM);
945
946         export->exp_conn_cnt = 0;
947         export->exp_lock_hash = NULL;
948         export->exp_flock_hash = NULL;
949         /* 2 = class_handle_hash + last */
950         refcount_set(&export->exp_handle.h_ref, 2);
951         atomic_set(&export->exp_rpc_count, 0);
952         atomic_set(&export->exp_cb_count, 0);
953         atomic_set(&export->exp_locks_count, 0);
954 #if LUSTRE_TRACKS_LOCK_EXP_REFS
955         INIT_LIST_HEAD(&export->exp_locks_list);
956         spin_lock_init(&export->exp_locks_list_guard);
957 #endif
958         atomic_set(&export->exp_replay_count, 0);
959         export->exp_obd = obd;
960         INIT_LIST_HEAD(&export->exp_outstanding_replies);
961         spin_lock_init(&export->exp_uncommitted_replies_lock);
962         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
963         INIT_LIST_HEAD(&export->exp_req_replay_queue);
964         INIT_HLIST_NODE(&export->exp_handle.h_link);
965         INIT_LIST_HEAD(&export->exp_hp_rpcs);
966         INIT_LIST_HEAD(&export->exp_reg_rpcs);
967         class_handle_hash(&export->exp_handle, export_handle_owner);
968         export->exp_last_request_time = ktime_get_real_seconds();
969         spin_lock_init(&export->exp_lock);
970         spin_lock_init(&export->exp_rpc_lock);
971         INIT_HLIST_NODE(&export->exp_gen_hash);
972         spin_lock_init(&export->exp_bl_list_lock);
973         INIT_LIST_HEAD(&export->exp_bl_list);
974         INIT_LIST_HEAD(&export->exp_stale_list);
975         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
976
977         export->exp_sp_peer = LUSTRE_SP_ANY;
978         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
979         export->exp_client_uuid = *cluuid;
980         obd_init_export(export);
981
982         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
983         export->exp_root_fid.f_seq = 0;
984         export->exp_root_fid.f_oid = 0;
985         export->exp_root_fid.f_ver = 0;
986
987         spin_lock(&obd->obd_dev_lock);
988         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
989                 /* shouldn't happen, but might race */
990                 if (obd->obd_stopping)
991                         GOTO(exit_unlock, rc = -ENODEV);
992
993                 rc = obd_uuid_add(obd, export);
994                 if (rc != 0) {
995                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
996                                       obd->obd_name, cluuid->uuid, rc);
997                         GOTO(exit_unlock, rc = -EALREADY);
998                 }
999         }
1000
1001         if (!is_self) {
1002                 class_incref(obd, "export", export);
1003                 list_add_tail(&export->exp_obd_chain_timed,
1004                               &obd->obd_exports_timed);
1005                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1006                 obd->obd_num_exports++;
1007         } else {
1008                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1009                 INIT_LIST_HEAD(&export->exp_obd_chain);
1010         }
1011         spin_unlock(&obd->obd_dev_lock);
1012         RETURN(export);
1013
1014 exit_unlock:
1015         spin_unlock(&obd->obd_dev_lock);
1016         class_handle_unhash(&export->exp_handle);
1017         obd_destroy_export(export);
1018         OBD_FREE_PTR(export);
1019         return ERR_PTR(rc);
1020 }
1021
1022 struct obd_export *class_new_export(struct obd_device *obd,
1023                                     struct obd_uuid *uuid)
1024 {
1025         return __class_new_export(obd, uuid, false);
1026 }
1027 EXPORT_SYMBOL(class_new_export);
1028
1029 struct obd_export *class_new_export_self(struct obd_device *obd,
1030                                          struct obd_uuid *uuid)
1031 {
1032         return __class_new_export(obd, uuid, true);
1033 }
1034
1035 void class_unlink_export(struct obd_export *exp)
1036 {
1037         class_handle_unhash(&exp->exp_handle);
1038
1039         if (exp->exp_obd->obd_self_export == exp) {
1040                 class_export_put(exp);
1041                 return;
1042         }
1043
1044         spin_lock(&exp->exp_obd->obd_dev_lock);
1045         /* delete an uuid-export hashitem from hashtables */
1046         if (exp != exp->exp_obd->obd_self_export)
1047                 obd_uuid_del(exp->exp_obd, exp);
1048
1049 #ifdef HAVE_SERVER_SUPPORT
1050         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1051                 struct tg_export_data   *ted = &exp->exp_target_data;
1052                 struct cfs_hash         *hash;
1053
1054                 /* Because obd_gen_hash will not be released until
1055                  * class_cleanup(), so hash should never be NULL here
1056                  */
1057                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1058                 LASSERT(hash != NULL);
1059                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1060                              &exp->exp_gen_hash);
1061                 cfs_hash_putref(hash);
1062         }
1063 #endif /* HAVE_SERVER_SUPPORT */
1064
1065         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1066         list_del_init(&exp->exp_obd_chain_timed);
1067         exp->exp_obd->obd_num_exports--;
1068         spin_unlock(&exp->exp_obd->obd_dev_lock);
1069
1070         /* A reference is kept by obd_stale_exports list */
1071         obd_stale_export_put(exp);
1072 }
1073 EXPORT_SYMBOL(class_unlink_export);
1074
1075 /* Import management functions */
1076 static void obd_zombie_import_free(struct obd_import *imp)
1077 {
1078         struct obd_import_conn *imp_conn;
1079
1080         ENTRY;
1081         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1082                imp->imp_obd->obd_name);
1083
1084         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1085
1086         ptlrpc_connection_put(imp->imp_connection);
1087
1088         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1089                                                     struct obd_import_conn,
1090                                                     oic_item)) != NULL) {
1091                 list_del_init(&imp_conn->oic_item);
1092                 ptlrpc_connection_put(imp_conn->oic_conn);
1093                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1094         }
1095
1096         LASSERT(imp->imp_sec == NULL);
1097         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1098                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1099         class_decref(imp->imp_obd, "import", imp);
1100         OBD_FREE_PTR(imp);
1101         EXIT;
1102 }
1103
1104 struct obd_import *class_import_get(struct obd_import *import)
1105 {
1106         refcount_inc(&import->imp_refcount);
1107         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1108                refcount_read(&import->imp_refcount),
1109                import->imp_obd->obd_name);
1110         return import;
1111 }
1112 EXPORT_SYMBOL(class_import_get);
1113
1114 void class_import_put(struct obd_import *imp)
1115 {
1116         ENTRY;
1117
1118         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1119
1120         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1121                refcount_read(&imp->imp_refcount) - 1,
1122                imp->imp_obd->obd_name);
1123
1124         if (refcount_dec_and_test(&imp->imp_refcount)) {
1125                 CDEBUG(D_INFO, "final put import %p\n", imp);
1126                 obd_zombie_import_add(imp);
1127         }
1128
1129         EXIT;
1130 }
1131 EXPORT_SYMBOL(class_import_put);
1132
1133 static void init_imp_at(struct imp_at *at)
1134 {
1135         int i;
1136
1137         at_init(&at->iat_net_latency, 0, 0);
1138         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1139                 /* max service estimates are tracked server side, so dont't
1140                  * use AT history here, just use the last reported val. (But
1141                  * keep hist for proc histogram, worst_ever)
1142                  */
1143                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1144                         AT_FLG_NOHIST);
1145         }
1146 }
1147
1148 static void obd_zombie_imp_cull(struct work_struct *ws)
1149 {
1150         struct obd_import *import;
1151
1152         import = container_of(ws, struct obd_import, imp_zombie_work);
1153         obd_zombie_import_free(import);
1154 }
1155
1156 struct obd_import *class_new_import(struct obd_device *obd)
1157 {
1158         struct obd_import *imp;
1159         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1160
1161         OBD_ALLOC(imp, sizeof(*imp));
1162         if (imp == NULL)
1163                 return NULL;
1164
1165         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1166         INIT_LIST_HEAD(&imp->imp_replay_list);
1167         INIT_LIST_HEAD(&imp->imp_sending_list);
1168         INIT_LIST_HEAD(&imp->imp_delayed_list);
1169         INIT_LIST_HEAD(&imp->imp_committed_list);
1170         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1171         imp->imp_known_replied_xid = 0;
1172         imp->imp_replay_cursor = &imp->imp_committed_list;
1173         spin_lock_init(&imp->imp_lock);
1174         imp->imp_last_success_conn = 0;
1175         imp->imp_state = LUSTRE_IMP_NEW;
1176         imp->imp_obd = class_incref(obd, "import", imp);
1177         rwlock_init(&imp->imp_sec_lock);
1178         init_waitqueue_head(&imp->imp_recovery_waitq);
1179         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1180
1181         if (curr_pid_ns && curr_pid_ns->child_reaper)
1182                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1183         else
1184                 imp->imp_sec_refpid = 1;
1185
1186         refcount_set(&imp->imp_refcount, 2);
1187         atomic_set(&imp->imp_unregistering, 0);
1188         atomic_set(&imp->imp_reqs, 0);
1189         atomic_set(&imp->imp_inflight, 0);
1190         atomic_set(&imp->imp_replay_inflight, 0);
1191         init_waitqueue_head(&imp->imp_replay_waitq);
1192         atomic_set(&imp->imp_inval_count, 0);
1193         atomic_set(&imp->imp_waiting, 0);
1194         INIT_LIST_HEAD(&imp->imp_conn_list);
1195         init_imp_at(&imp->imp_at);
1196
1197         /* the default magic is V2, will be used in connect RPC, and
1198          * then adjusted according to the flags in request/reply.
1199          */
1200         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1201
1202         return imp;
1203 }
1204 EXPORT_SYMBOL(class_new_import);
1205
1206 void class_destroy_import(struct obd_import *import)
1207 {
1208         LASSERT(import != NULL);
1209         LASSERT(import != LP_POISON);
1210
1211         spin_lock(&import->imp_lock);
1212         import->imp_generation++;
1213         spin_unlock(&import->imp_lock);
1214         class_import_put(import);
1215 }
1216 EXPORT_SYMBOL(class_destroy_import);
1217
1218 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1219
1220 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1221 {
1222         spin_lock(&exp->exp_locks_list_guard);
1223
1224         LASSERT(lock->l_exp_refs_nr >= 0);
1225
1226         if (lock->l_exp_refs_target != NULL &&
1227             lock->l_exp_refs_target != exp) {
1228                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1229                               exp, lock, lock->l_exp_refs_target);
1230         }
1231         if ((lock->l_exp_refs_nr++) == 0) {
1232                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1233                 lock->l_exp_refs_target = exp;
1234         }
1235         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1236                lock, exp, lock->l_exp_refs_nr);
1237         spin_unlock(&exp->exp_locks_list_guard);
1238 }
1239 EXPORT_SYMBOL(__class_export_add_lock_ref);
1240
1241 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1242 {
1243         spin_lock(&exp->exp_locks_list_guard);
1244         LASSERT(lock->l_exp_refs_nr > 0);
1245         if (lock->l_exp_refs_target != exp) {
1246                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1247                               lock, lock->l_exp_refs_target, exp);
1248         }
1249         if (-- lock->l_exp_refs_nr == 0) {
1250                 list_del_init(&lock->l_exp_refs_link);
1251                 lock->l_exp_refs_target = NULL;
1252         }
1253         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1254                lock, exp, lock->l_exp_refs_nr);
1255         spin_unlock(&exp->exp_locks_list_guard);
1256 }
1257 EXPORT_SYMBOL(__class_export_del_lock_ref);
1258 #endif
1259
1260 /* A connection defines an export context in which preallocation can be
1261  * managed. This releases the export pointer reference, and returns the export
1262  * handle, so the export refcount is 1 when this function returns.
1263  */
1264 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1265                   struct obd_uuid *cluuid)
1266 {
1267         struct obd_export *export;
1268
1269         LASSERT(conn != NULL);
1270         LASSERT(obd != NULL);
1271         LASSERT(cluuid != NULL);
1272         ENTRY;
1273
1274         export = class_new_export(obd, cluuid);
1275         if (IS_ERR(export))
1276                 RETURN(PTR_ERR(export));
1277
1278         conn->cookie = export->exp_handle.h_cookie;
1279         class_export_put(export);
1280
1281         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1282                cluuid->uuid, conn->cookie);
1283         RETURN(0);
1284 }
1285 EXPORT_SYMBOL(class_connect);
1286
1287 /* if export is involved in recovery then clean up related things */
1288 static void class_export_recovery_cleanup(struct obd_export *exp)
1289 {
1290         struct obd_device *obd = exp->exp_obd;
1291
1292         spin_lock(&obd->obd_recovery_task_lock);
1293         if (obd->obd_recovering) {
1294                 if (exp->exp_in_recovery) {
1295                         spin_lock(&exp->exp_lock);
1296                         exp->exp_in_recovery = 0;
1297                         spin_unlock(&exp->exp_lock);
1298                         LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1299                         atomic_dec(&obd->obd_connected_clients);
1300                 }
1301
1302                 /* if called during recovery then should update
1303                  * obd_stale_clients counter, lightweight exports is not counted
1304                  */
1305                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1306                         exp->exp_obd->obd_stale_clients++;
1307         }
1308         spin_unlock(&obd->obd_recovery_task_lock);
1309
1310         spin_lock(&exp->exp_lock);
1311         /** Cleanup req replay fields */
1312         if (exp->exp_req_replay_needed) {
1313                 exp->exp_req_replay_needed = 0;
1314
1315                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1316                 atomic_dec(&obd->obd_req_replay_clients);
1317         }
1318
1319         /** Cleanup lock replay data */
1320         if (exp->exp_lock_replay_needed) {
1321                 exp->exp_lock_replay_needed = 0;
1322
1323                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1324                 atomic_dec(&obd->obd_lock_replay_clients);
1325         }
1326         spin_unlock(&exp->exp_lock);
1327 }
1328
1329 /* This function removes 1-3 references from the export:
1330  * 1 - for export pointer passed
1331  * and if disconnect really need
1332  * 2 - removing from hash
1333  * 3 - in client_unlink_export
1334  * The export pointer passed to this function can destroyed
1335  */
1336 int class_disconnect(struct obd_export *export)
1337 {
1338         int already_disconnected;
1339
1340         ENTRY;
1341
1342         if (export == NULL) {
1343                 CWARN("attempting to free NULL export %p\n", export);
1344                 RETURN(-EINVAL);
1345         }
1346
1347         spin_lock(&export->exp_lock);
1348         already_disconnected = export->exp_disconnected;
1349         export->exp_disconnected = 1;
1350 #ifdef HAVE_SERVER_SUPPORT
1351         /*  We hold references of export for uuid hash and nid_hash and export
1352          *  link at least. So it is safe to call rh*table_remove_fast in there.
1353          */
1354         obd_nid_del(export->exp_obd, export);
1355 #endif /* HAVE_SERVER_SUPPORT */
1356         spin_unlock(&export->exp_lock);
1357
1358         /* class_cleanup(), abort_recovery(), and class_fail_export() all end up
1359          * here, and any of them race we shouldn't call extra class_export_puts
1360          */
1361         if (already_disconnected)
1362                 GOTO(no_disconn, already_disconnected);
1363
1364         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1365                export->exp_handle.h_cookie);
1366
1367         class_export_recovery_cleanup(export);
1368         class_unlink_export(export);
1369 no_disconn:
1370         class_export_put(export);
1371         RETURN(0);
1372 }
1373 EXPORT_SYMBOL(class_disconnect);
1374
1375 /* Return non-zero for a fully connected export */
1376 int class_connected_export(struct obd_export *exp)
1377 {
1378         int connected = 0;
1379
1380         if (exp) {
1381                 spin_lock(&exp->exp_lock);
1382                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1383                 spin_unlock(&exp->exp_lock);
1384         }
1385         return connected;
1386 }
1387 EXPORT_SYMBOL(class_connected_export);
1388
1389 static void class_disconnect_export_list(struct list_head *list,
1390                                          enum obd_option flags)
1391 {
1392         int rc;
1393         struct obd_export *exp;
1394
1395         ENTRY;
1396
1397         /* It's possible that an export may disconnect itself, but
1398          * nothing else will be added to this list.
1399          */
1400         while ((exp = list_first_entry_or_null(list, struct obd_export,
1401                                                exp_obd_chain)) != NULL) {
1402                 /* need for safe call CDEBUG after obd_disconnect */
1403                 class_export_get(exp);
1404
1405                 spin_lock(&exp->exp_lock);
1406                 exp->exp_flags = flags;
1407                 spin_unlock(&exp->exp_lock);
1408
1409                 if (obd_uuid_equals(&exp->exp_client_uuid,
1410                                     &exp->exp_obd->obd_uuid)) {
1411                         CDEBUG(D_HA,
1412                                "exp %p export uuid == obd uuid, don't discon\n",
1413                                exp);
1414                         /* Need to delete this now so we don't end up pointing
1415                          * to work_list later when this export is cleaned up.
1416                          */
1417                         list_del_init(&exp->exp_obd_chain);
1418                         class_export_put(exp);
1419                         continue;
1420                 }
1421
1422                 class_export_get(exp);
1423                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at %lld\n",
1424                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1425                        exp, exp->exp_last_request_time);
1426                 /* release one export reference anyway */
1427                 rc = obd_disconnect(exp);
1428
1429                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1430                        obd_export_nid2str(exp), exp, rc);
1431                 class_export_put(exp);
1432         }
1433         EXIT;
1434 }
1435
1436 void class_disconnect_exports(struct obd_device *obd)
1437 {
1438         LIST_HEAD(work_list);
1439
1440         ENTRY;
1441
1442         /* Move all of the exports from obd_exports to a work list, en masse. */
1443         spin_lock(&obd->obd_dev_lock);
1444         list_splice_init(&obd->obd_exports, &work_list);
1445         list_splice_init(&obd->obd_delayed_exports, &work_list);
1446         spin_unlock(&obd->obd_dev_lock);
1447
1448         if (!list_empty(&work_list)) {
1449                 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1450                        obd->obd_minor, obd);
1451                 class_disconnect_export_list(&work_list,
1452                                              exp_flags_from_obd(obd));
1453         } else
1454                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1455                        obd->obd_minor, obd);
1456         EXIT;
1457 }
1458 EXPORT_SYMBOL(class_disconnect_exports);
1459
1460 /* Remove exports that have not completed recovery.
1461  */
1462 void class_disconnect_stale_exports(struct obd_device *obd,
1463                                     int (*test_export)(struct obd_export *))
1464 {
1465         LIST_HEAD(work_list);
1466         struct obd_export *exp, *n;
1467         int evicted = 0;
1468
1469         ENTRY;
1470
1471         spin_lock(&obd->obd_dev_lock);
1472         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1473                                  exp_obd_chain) {
1474                 /* don't count self-export as client */
1475                 if (obd_uuid_equals(&exp->exp_client_uuid,
1476                                     &exp->exp_obd->obd_uuid))
1477                         continue;
1478
1479                 /* don't evict clients which have no slot in last_rcvd
1480                  * (e.g. lightweight connection)
1481                  */
1482                 if (exp->exp_target_data.ted_lr_idx == -1)
1483                         continue;
1484
1485                 spin_lock(&exp->exp_lock);
1486                 if (exp->exp_failed || test_export(exp)) {
1487                         spin_unlock(&exp->exp_lock);
1488                         continue;
1489                 }
1490                 exp->exp_failed = 1;
1491                 atomic_inc(&exp->exp_obd->obd_eviction_count);
1492                 spin_unlock(&exp->exp_lock);
1493
1494                 list_move(&exp->exp_obd_chain, &work_list);
1495                 evicted++;
1496                 CWARN("%s: disconnect stale client %s@%s\n",
1497                       obd->obd_name, exp->exp_client_uuid.uuid,
1498                       obd_export_nid2str(exp));
1499                 print_export_data(exp, "EVICTING", 0, D_HA);
1500         }
1501         spin_unlock(&obd->obd_dev_lock);
1502
1503         if (evicted)
1504                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1505                               obd->obd_name, evicted);
1506
1507         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1508                                                  OBD_OPT_ABORT_RECOV);
1509         EXIT;
1510 }
1511 EXPORT_SYMBOL(class_disconnect_stale_exports);
1512
1513 void class_fail_export(struct obd_export *exp)
1514 {
1515         int rc, already_failed;
1516
1517         spin_lock(&exp->exp_lock);
1518         already_failed = exp->exp_failed;
1519         exp->exp_failed = 1;
1520         spin_unlock(&exp->exp_lock);
1521
1522         if (already_failed) {
1523                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1524                        exp, exp->exp_client_uuid.uuid);
1525                 return;
1526         }
1527
1528         atomic_inc(&exp->exp_obd->obd_eviction_count);
1529
1530         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1531                exp, exp->exp_client_uuid.uuid);
1532
1533         if (obd_dump_on_timeout)
1534                 libcfs_debug_dumplog();
1535
1536         /* need for safe call CDEBUG after obd_disconnect */
1537         class_export_get(exp);
1538
1539         /* Callers into obd_disconnect are removing their own ref(eg request) in
1540          * addition to one from hash table. We don't have such a ref so make one
1541          */
1542         class_export_get(exp);
1543         rc = obd_disconnect(exp);
1544         if (rc)
1545                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1546         else
1547                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1548                        exp, exp->exp_client_uuid.uuid);
1549         class_export_put(exp);
1550 }
1551 EXPORT_SYMBOL(class_fail_export);
1552
1553 #ifdef HAVE_SERVER_SUPPORT
1554
1555 static int take_first(struct obd_export *exp, void *data)
1556 {
1557         struct obd_export **expp = data;
1558
1559         if (*expp)
1560                 /* already have one */
1561                 return 0;
1562         if (exp->exp_failed)
1563                 /* Don't want this one */
1564                 return 0;
1565         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1566                 /* Cannot get a ref on this one */
1567                 return 0;
1568         *expp = exp;
1569         return 1;
1570 }
1571
1572 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1573 {
1574         struct lnet_nid nid_key;
1575         struct obd_export *doomed_exp;
1576         int exports_evicted = 0;
1577
1578         libcfs_strnid(&nid_key, nid);
1579
1580         spin_lock(&obd->obd_dev_lock);
1581         /* umount already run. evict thread should stop leaving unmount thread
1582          * to take over
1583          */
1584         if (obd->obd_stopping) {
1585                 spin_unlock(&obd->obd_dev_lock);
1586                 return exports_evicted;
1587         }
1588         spin_unlock(&obd->obd_dev_lock);
1589
1590         doomed_exp = NULL;
1591         while (obd_nid_export_for_each(obd, &nid_key,
1592                                        take_first, &doomed_exp) > 0) {
1593
1594                 LASSERTF(doomed_exp != obd->obd_self_export,
1595                          "self-export is hashed by NID?\n");
1596
1597                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1598                               obd->obd_name,
1599                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1600                               obd_export_nid2str(doomed_exp));
1601
1602                 class_fail_export(doomed_exp);
1603                 class_export_put(doomed_exp);
1604                 exports_evicted++;
1605                 doomed_exp = NULL;
1606         }
1607
1608         if (!exports_evicted)
1609                 CDEBUG(D_HA,
1610                        "%s: can't disconnect NID '%s': no exports found\n",
1611                        obd->obd_name, nid);
1612         return exports_evicted;
1613 }
1614 EXPORT_SYMBOL(obd_export_evict_by_nid);
1615
1616 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1617 {
1618         struct obd_export *doomed_exp = NULL;
1619         struct obd_uuid doomed_uuid;
1620         int exports_evicted = 0;
1621
1622         spin_lock(&obd->obd_dev_lock);
1623         if (obd->obd_stopping) {
1624                 spin_unlock(&obd->obd_dev_lock);
1625                 return exports_evicted;
1626         }
1627         spin_unlock(&obd->obd_dev_lock);
1628
1629         obd_str2uuid(&doomed_uuid, uuid);
1630         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1631                 CERROR("%s: can't evict myself\n", obd->obd_name);
1632                 return exports_evicted;
1633         }
1634
1635         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1636         if (doomed_exp == NULL) {
1637                 CERROR("%s: can't disconnect %s: no exports found\n",
1638                        obd->obd_name, uuid);
1639         } else {
1640                 CWARN("%s: evicting %s at adminstrative request\n",
1641                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1642                 class_fail_export(doomed_exp);
1643                 class_export_put(doomed_exp);
1644                 obd_uuid_del(obd, doomed_exp);
1645                 exports_evicted++;
1646         }
1647
1648         return exports_evicted;
1649 }
1650 #endif /* HAVE_SERVER_SUPPORT */
1651
1652 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1653 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1654 EXPORT_SYMBOL(class_export_dump_hook);
1655 #endif
1656
1657 static void print_export_data(struct obd_export *exp, const char *status,
1658                               int locks, int debug_level)
1659 {
1660         struct ptlrpc_reply_state *rs;
1661         struct ptlrpc_reply_state *first_reply = NULL;
1662         int nreplies = 0;
1663
1664         spin_lock(&exp->exp_lock);
1665         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1666                             rs_exp_list) {
1667                 if (nreplies == 0)
1668                         first_reply = rs;
1669                 nreplies++;
1670         }
1671         spin_unlock(&exp->exp_lock);
1672
1673         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu stale:%d\n",
1674                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1675                obd_export_nid2str(exp),
1676                refcount_read(&exp->exp_handle.h_ref),
1677                atomic_read(&exp->exp_rpc_count),
1678                atomic_read(&exp->exp_cb_count),
1679                atomic_read(&exp->exp_locks_count),
1680                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1681                nreplies, first_reply, nreplies > 3 ? "..." : "",
1682                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1683 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1684         if (locks && class_export_dump_hook != NULL)
1685                 class_export_dump_hook(exp);
1686 #endif
1687 }
1688
1689 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1690 {
1691         struct obd_export *exp;
1692
1693         spin_lock(&obd->obd_dev_lock);
1694         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1695                 print_export_data(exp, "ACTIVE", locks, debug_level);
1696         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1697                 print_export_data(exp, "UNLINKED", locks, debug_level);
1698         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1699                 print_export_data(exp, "DELAYED", locks, debug_level);
1700         spin_unlock(&obd->obd_dev_lock);
1701 }
1702
1703 void obd_exports_barrier(struct obd_device *obd)
1704 {
1705         int waited = 2;
1706
1707         LASSERT(list_empty(&obd->obd_exports));
1708         spin_lock(&obd->obd_dev_lock);
1709         while (!list_empty(&obd->obd_unlinked_exports)) {
1710                 spin_unlock(&obd->obd_dev_lock);
1711                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1712                 if (waited > 5 && is_power_of_2(waited)) {
1713                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1714                                       obd->obd_name, waited,
1715                                       kref_read(&obd->obd_refcount));
1716                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1717                 }
1718                 waited *= 2;
1719                 spin_lock(&obd->obd_dev_lock);
1720         }
1721         spin_unlock(&obd->obd_dev_lock);
1722 }
1723 EXPORT_SYMBOL(obd_exports_barrier);
1724
1725 /* Add export to the obd_zombe thread and notify it. */
1726 static void obd_zombie_export_add(struct obd_export *exp)
1727 {
1728         atomic_inc(&obd_stale_export_num);
1729         spin_lock(&exp->exp_obd->obd_dev_lock);
1730         LASSERT(!list_empty(&exp->exp_obd_chain));
1731         list_del_init(&exp->exp_obd_chain);
1732         spin_unlock(&exp->exp_obd->obd_dev_lock);
1733         queue_work(zombie_wq, &exp->exp_zombie_work);
1734 }
1735
1736 /* Add import to the obd_zombe thread and notify it. */
1737 static void obd_zombie_import_add(struct obd_import *imp)
1738 {
1739         LASSERT(imp->imp_sec == NULL);
1740
1741         queue_work(zombie_wq, &imp->imp_zombie_work);
1742 }
1743
1744 /* wait when obd_zombie import/export queues become empty */
1745 void obd_zombie_barrier(void)
1746 {
1747         wait_var_event(&obd_stale_export_num,
1748                         atomic_read(&obd_stale_export_num) == 0);
1749         flush_workqueue(zombie_wq);
1750 }
1751 EXPORT_SYMBOL(obd_zombie_barrier);
1752
1753
1754 struct obd_export *obd_stale_export_get(void)
1755 {
1756         struct obd_export *exp = NULL;
1757
1758         ENTRY;
1759
1760         spin_lock(&obd_stale_export_lock);
1761         if (!list_empty(&obd_stale_exports)) {
1762                 exp = list_first_entry(&obd_stale_exports,
1763                                        struct obd_export, exp_stale_list);
1764                 list_del_init(&exp->exp_stale_list);
1765         }
1766         spin_unlock(&obd_stale_export_lock);
1767
1768         if (exp) {
1769                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1770                        atomic_read(&obd_stale_export_num));
1771         }
1772         RETURN(exp);
1773 }
1774 EXPORT_SYMBOL(obd_stale_export_get);
1775
1776 void obd_stale_export_put(struct obd_export *exp)
1777 {
1778         ENTRY;
1779
1780         LASSERT(list_empty(&exp->exp_stale_list));
1781         if (exp->exp_lock_hash &&
1782             atomic_read(&exp->exp_lock_hash->hs_count)) {
1783                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1784                        atomic_read(&obd_stale_export_num));
1785
1786                 spin_lock_bh(&exp->exp_bl_list_lock);
1787                 spin_lock(&obd_stale_export_lock);
1788                 /* Add to the tail if there is no blocked locks,
1789                  * to the head otherwise.
1790                  */
1791                 if (list_empty(&exp->exp_bl_list))
1792                         list_add_tail(&exp->exp_stale_list,
1793                                       &obd_stale_exports);
1794                 else
1795                         list_add(&exp->exp_stale_list,
1796                                  &obd_stale_exports);
1797
1798                 spin_unlock(&obd_stale_export_lock);
1799                 spin_unlock_bh(&exp->exp_bl_list_lock);
1800         } else {
1801                 class_export_put(exp);
1802         }
1803         EXIT;
1804 }
1805 EXPORT_SYMBOL(obd_stale_export_put);
1806
1807 /**
1808  * Adjust the position of the export in the stale list,
1809  * i.e. move to the head of the list if is needed.
1810  **/
1811 void obd_stale_export_adjust(struct obd_export *exp)
1812 {
1813         LASSERT(exp != NULL);
1814         spin_lock_bh(&exp->exp_bl_list_lock);
1815         spin_lock(&obd_stale_export_lock);
1816
1817         if (!list_empty(&exp->exp_stale_list) &&
1818             !list_empty(&exp->exp_bl_list))
1819                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1820
1821         spin_unlock(&obd_stale_export_lock);
1822         spin_unlock_bh(&exp->exp_bl_list_lock);
1823 }
1824 EXPORT_SYMBOL(obd_stale_export_adjust);
1825
1826 /* start destroy zombie import/export thread */
1827 int obd_zombie_impexp_init(void)
1828 {
1829         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1830                                            0, CFS_CPT_ANY,
1831                                            cfs_cpt_number(cfs_cpt_tab));
1832
1833         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1834 }
1835
1836 /* stop destroy zombie import/export thread */
1837 void obd_zombie_impexp_stop(void)
1838 {
1839         destroy_workqueue(zombie_wq);
1840         LASSERT(list_empty(&obd_stale_exports));
1841 }
1842
1843 /***** Kernel-userspace comm helpers *******/
1844
1845 /* Get length of entire message, including header */
1846 int kuc_len(int payload_len)
1847 {
1848         return sizeof(struct kuc_hdr) + payload_len;
1849 }
1850 EXPORT_SYMBOL(kuc_len);
1851
1852 /* Get a pointer to kuc header, given a ptr to the payload
1853  * @param p Pointer to payload area
1854  * @returns Pointer to kuc header
1855  */
1856 struct kuc_hdr *kuc_ptr(void *p)
1857 {
1858         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1859
1860         LASSERT(lh->kuc_magic == KUC_MAGIC);
1861         return lh;
1862 }
1863 EXPORT_SYMBOL(kuc_ptr);
1864
1865 /* Alloc space for a message, and fill in header
1866  * @return Pointer to payload area
1867  */
1868 void *kuc_alloc(int payload_len, int transport, int type)
1869 {
1870         struct kuc_hdr *lh;
1871         int len = kuc_len(payload_len);
1872
1873         OBD_ALLOC(lh, len);
1874         if (lh == NULL)
1875                 return ERR_PTR(-ENOMEM);
1876
1877         lh->kuc_magic = KUC_MAGIC;
1878         lh->kuc_transport = transport;
1879         lh->kuc_msgtype = type;
1880         lh->kuc_msglen = len;
1881
1882         return (void *)(lh + 1);
1883 }
1884 EXPORT_SYMBOL(kuc_alloc);
1885
1886 /* Takes pointer to payload area */
1887 void kuc_free(void *p, int payload_len)
1888 {
1889         struct kuc_hdr *lh = kuc_ptr(p);
1890
1891         OBD_FREE(lh, kuc_len(payload_len));
1892 }
1893 EXPORT_SYMBOL(kuc_free);
1894
1895 struct obd_request_slot_waiter {
1896         struct list_head        orsw_entry;
1897         wait_queue_head_t       orsw_waitq;
1898         bool                    orsw_signaled;
1899 };
1900
1901 static bool obd_request_slot_avail(struct client_obd *cli,
1902                                    struct obd_request_slot_waiter *orsw)
1903 {
1904         bool avail;
1905
1906         spin_lock(&cli->cl_loi_list_lock);
1907         avail = !!list_empty(&orsw->orsw_entry);
1908         spin_unlock(&cli->cl_loi_list_lock);
1909
1910         return avail;
1911 };
1912
1913 /*
1914  * For network flow control, the RPC sponsor needs to acquire a credit
1915  * before sending the RPC. The credits count for a connection is defined
1916  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1917  * the subsequent RPC sponsors need to wait until others released their
1918  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1919  */
1920 int obd_get_request_slot(struct client_obd *cli)
1921 {
1922         struct obd_request_slot_waiter   orsw;
1923         int                              rc;
1924
1925         spin_lock(&cli->cl_loi_list_lock);
1926         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1927                 cli->cl_rpcs_in_flight++;
1928                 spin_unlock(&cli->cl_loi_list_lock);
1929                 return 0;
1930         }
1931
1932         init_waitqueue_head(&orsw.orsw_waitq);
1933         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1934         orsw.orsw_signaled = false;
1935         spin_unlock(&cli->cl_loi_list_lock);
1936
1937         rc = l_wait_event_abortable(orsw.orsw_waitq,
1938                                     obd_request_slot_avail(cli, &orsw) ||
1939                                     orsw.orsw_signaled);
1940
1941         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1942          * freed but other (such as obd_put_request_slot) is using it.
1943          */
1944         spin_lock(&cli->cl_loi_list_lock);
1945         if (rc != 0) {
1946                 if (!orsw.orsw_signaled) {
1947                         if (list_empty(&orsw.orsw_entry))
1948                                 cli->cl_rpcs_in_flight--;
1949                         else
1950                                 list_del(&orsw.orsw_entry);
1951                 }
1952                 rc = -EINTR;
1953         }
1954
1955         if (orsw.orsw_signaled) {
1956                 LASSERT(list_empty(&orsw.orsw_entry));
1957
1958                 rc = -EINTR;
1959         }
1960         spin_unlock(&cli->cl_loi_list_lock);
1961
1962         return rc;
1963 }
1964 EXPORT_SYMBOL(obd_get_request_slot);
1965
1966 void obd_put_request_slot(struct client_obd *cli)
1967 {
1968         struct obd_request_slot_waiter *orsw;
1969
1970         spin_lock(&cli->cl_loi_list_lock);
1971         cli->cl_rpcs_in_flight--;
1972
1973         /* If there is free slot, wakeup the first waiter. */
1974         if (!list_empty(&cli->cl_flight_waiters) &&
1975             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
1976                 orsw = list_first_entry(&cli->cl_flight_waiters,
1977                                         struct obd_request_slot_waiter,
1978                                         orsw_entry);
1979                 list_del_init(&orsw->orsw_entry);
1980                 cli->cl_rpcs_in_flight++;
1981                 wake_up(&orsw->orsw_waitq);
1982         }
1983         spin_unlock(&cli->cl_loi_list_lock);
1984 }
1985 EXPORT_SYMBOL(obd_put_request_slot);
1986
1987 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1988 {
1989         return cli->cl_max_rpcs_in_flight;
1990 }
1991 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1992
1993 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1994 {
1995         struct obd_request_slot_waiter *orsw;
1996         __u32                           old;
1997         int                             diff;
1998         int                             i;
1999         int                             rc;
2000
2001         if (max > OBD_MAX_RIF_MAX || max < 1)
2002                 return -ERANGE;
2003
2004         CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
2005                cli->cl_import->imp_obd->obd_name, max,
2006                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2007
2008         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2009                    LUSTRE_MDC_NAME) == 0) {
2010                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2011                  * strictly lower that max_rpcs_in_flight
2012                  */
2013                 if (max < 2) {
2014                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2015                                cli->cl_import->imp_obd->obd_name);
2016                         return -ERANGE;
2017                 }
2018                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2019                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2020                         if (rc != 0)
2021                                 return rc;
2022                 }
2023         }
2024
2025         spin_lock(&cli->cl_loi_list_lock);
2026         old = cli->cl_max_rpcs_in_flight;
2027         cli->cl_max_rpcs_in_flight = max;
2028         client_adjust_max_dirty(cli);
2029
2030         diff = max - old;
2031
2032         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2033         for (i = 0; i < diff; i++) {
2034                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2035                                                 struct obd_request_slot_waiter,
2036                                                 orsw_entry);
2037                 if (!orsw)
2038                         break;
2039
2040                 list_del_init(&orsw->orsw_entry);
2041                 cli->cl_rpcs_in_flight++;
2042                 wake_up(&orsw->orsw_waitq);
2043         }
2044         spin_unlock(&cli->cl_loi_list_lock);
2045
2046         return 0;
2047 }
2048 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2049
2050 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2051 {
2052         return cli->cl_max_mod_rpcs_in_flight;
2053 }
2054 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2055
2056 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2057 {
2058         struct obd_connect_data *ocd;
2059         __u16 maxmodrpcs;
2060         __u16 prev;
2061
2062         if (max > OBD_MAX_RIF_MAX || max < 1)
2063                 return -ERANGE;
2064
2065         ocd = &cli->cl_import->imp_connect_data;
2066         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2067                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2068                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2069
2070         if (max == OBD_MAX_RIF_MAX)
2071                 max = OBD_MAX_RIF_MAX - 1;
2072
2073         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2074          * increase this value, also bump up max_rpcs_in_flight to match.
2075          */
2076         if (max >= cli->cl_max_rpcs_in_flight) {
2077                 CDEBUG(D_INFO,
2078                        "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2079                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2080                 obd_set_max_rpcs_in_flight(cli, max + 1);
2081         }
2082
2083         /* cannot exceed max modify RPCs in flight supported by the server,
2084          * but verify ocd_connect_flags is at least initialized first.  If
2085          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2086          */
2087         if (!ocd->ocd_connect_flags) {
2088                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2089         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2090                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2091                 if (maxmodrpcs == 0) { /* connection not finished yet */
2092                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2093                         CDEBUG(D_INFO,
2094                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2095                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2096                 }
2097         } else {
2098                 maxmodrpcs = 1;
2099         }
2100         if (max > maxmodrpcs) {
2101                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2102                        cli->cl_import->imp_obd->obd_name,
2103                        max, maxmodrpcs);
2104                 return -ERANGE;
2105         }
2106
2107         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2108
2109         prev = cli->cl_max_mod_rpcs_in_flight;
2110         cli->cl_max_mod_rpcs_in_flight = max;
2111
2112         /* wakeup waiters if limit has been increased */
2113         if (cli->cl_max_mod_rpcs_in_flight > prev)
2114                 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2115
2116         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2117
2118         return 0;
2119 }
2120 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2121
2122 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2123                                struct seq_file *seq)
2124 {
2125         unsigned long mod_tot = 0, mod_cum;
2126         int i;
2127
2128         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2129         lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2130                              ":", true, "");
2131         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2132                    cli->cl_mod_rpcs_in_flight);
2133
2134         seq_puts(seq, "\n\t\t\tmodify\n");
2135         seq_puts(seq, "rpcs in flight        rpcs   %% cum %%\n");
2136
2137         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2138
2139         mod_cum = 0;
2140         for (i = 0; i < OBD_HIST_MAX; i++) {
2141                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2142
2143                 mod_cum += mod;
2144                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2145                            i, mod, pct(mod, mod_tot),
2146                            pct(mod_cum, mod_tot));
2147                 if (mod_cum == mod_tot)
2148                         break;
2149         }
2150
2151         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2152
2153         return 0;
2154 }
2155 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2156
2157 /* The number of modify RPCs sent in parallel is limited
2158  * because the server has a finite number of slots per client to
2159  * store request result and ensure reply reconstruction when needed.
2160  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2161  * that takes into account server limit and cl_max_rpcs_in_flight
2162  * value.
2163  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2164  * one close request is allowed above the maximum.
2165  */
2166 struct mod_waiter {
2167         struct client_obd *cli;
2168         bool close_req;
2169         bool woken;
2170         wait_queue_entry_t wqe;
2171 };
2172 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2173                                   unsigned int mode, int flags, void *key)
2174 {
2175         struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2176         struct client_obd *cli = w->cli;
2177         bool close_req = w->close_req;
2178         bool avail;
2179         int ret;
2180
2181         /* As woken_wake_function() doesn't remove us from the wait_queue,
2182          * we use own flag to ensure we're called just once.
2183          */
2184         if (w->woken)
2185                 return 0;
2186
2187         /* A slot is available if
2188          * - number of modify RPCs in flight is less than the max
2189          * - it's a close RPC and no other close request is in flight
2190          */
2191         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2192                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2193         if (avail) {
2194                 cli->cl_mod_rpcs_in_flight++;
2195                 if (w->close_req)
2196                         cli->cl_close_rpcs_in_flight++;
2197                 ret = woken_wake_function(wq_entry, mode, flags, key);
2198                 w->woken = true;
2199         } else if (cli->cl_close_rpcs_in_flight)
2200                 /* No other waiter could be woken */
2201                 ret = -1;
2202         else if (key == NULL)
2203                 /* This was not a wakeup from a close completion, so there is no
2204                  * point seeing if there are close waiters to be woken
2205                  */
2206                 ret = -1;
2207         else
2208                 /* There might be be a close we could wake, keep looking */
2209                 ret = 0;
2210         return ret;
2211 }
2212
2213 /* Get a modify RPC slot from the obd client @cli according
2214  * to the kind of operation @opc that is going to be sent
2215  * and the intent @it of the operation if it applies.
2216  * If the maximum number of modify RPCs in flight is reached
2217  * the thread is put to sleep.
2218  * Returns the tag to be set in the request message. Tag 0
2219  * is reserved for non-modifying requests.
2220  */
2221 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2222 {
2223         struct mod_waiter wait = {
2224                 .cli = cli,
2225                 .close_req = (opc == MDS_CLOSE),
2226                 .woken = false,
2227         };
2228         __u16                   i, max;
2229
2230         init_wait(&wait.wqe);
2231         wait.wqe.func = claim_mod_rpc_function;
2232
2233         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2234         __add_wait_queue_entry_tail(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2235         /* This wakeup will only succeed if the maximums haven't
2236          * been reached.  If that happens, wait.woken will be set
2237          * and there will be no need to wait.
2238          */
2239         wake_up_locked(&cli->cl_mod_rpcs_waitq);
2240         while (wait.woken == false) {
2241                 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2242                 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2243                            MAX_SCHEDULE_TIMEOUT);
2244                 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2245         }
2246         __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2247
2248         max = cli->cl_max_mod_rpcs_in_flight;
2249         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2250                          cli->cl_mod_rpcs_in_flight);
2251         /* find a free tag */
2252         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2253                                 max + 1);
2254         LASSERT(i < OBD_MAX_RIF_MAX);
2255         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2256         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2257         /* tag 0 is reserved for non-modify RPCs */
2258
2259         CDEBUG(D_RPCTRACE,
2260                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2261                cli->cl_import->imp_obd->obd_name,
2262                i + 1, opc, max);
2263
2264         return i + 1;
2265 }
2266 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2267
2268 /* Put a modify RPC slot from the obd client @cli according
2269  * to the kind of operation @opc that has been sent.
2270  */
2271 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2272 {
2273         bool                    close_req = false;
2274
2275         if (tag == 0)
2276                 return;
2277
2278         if (opc == MDS_CLOSE)
2279                 close_req = true;
2280
2281         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2282         cli->cl_mod_rpcs_in_flight--;
2283         if (close_req)
2284                 cli->cl_close_rpcs_in_flight--;
2285         /* release the tag in the bitmap */
2286         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2287         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2288         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2289                              (void *)close_req);
2290         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2291 }
2292 EXPORT_SYMBOL(obd_put_mod_rpc_slot);