Whamcloud - gitweb
7c15eba07ba2dba507f469294e7ed516a8dd34ca
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_XARRAY_ALLOC(obd_devs);
49 EXPORT_SYMBOL(obd_devs);
50
51 static atomic_t obd_devs_count = ATOMIC_INIT(0);
52
53 static struct kmem_cache *obd_device_cachep;
54 static struct kobj_type class_ktype;
55 static struct workqueue_struct *zombie_wq;
56
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks, int debug_level);
61
62 static LIST_HEAD(obd_stale_exports);
63 static DEFINE_SPINLOCK(obd_stale_export_lock);
64 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65
66 static struct obd_device *obd_device_alloc(void)
67 {
68         struct obd_device *obd;
69
70         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
71         if (obd != NULL)
72                 obd->obd_magic = OBD_DEVICE_MAGIC;
73         return obd;
74 }
75
76 static void obd_device_free(struct obd_device *obd)
77 {
78         LASSERT(obd != NULL);
79         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
80                  "obd %px obd_magic %08x != %08x\n",
81                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
82         if (obd->obd_namespace != NULL) {
83                 CERROR("obd %px: namespace %px was not properly cleaned up (obd_force=%d)!\n",
84                        obd, obd->obd_namespace, obd->obd_force);
85                 LBUG();
86         }
87         lu_ref_fini(&obd->obd_reference);
88         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
89 }
90
91 struct obd_type *class_search_type(const char *name)
92 {
93         struct kobject *kobj = kset_find_obj(lustre_kset, name);
94
95         if (kobj && kobj->ktype == &class_ktype)
96                 return container_of(kobj, struct obd_type, typ_kobj);
97
98         kobject_put(kobj);
99         return NULL;
100 }
101 EXPORT_SYMBOL(class_search_type);
102
103 struct obd_type *class_get_type(const char *name)
104 {
105         struct obd_type *type;
106
107         rcu_read_lock();
108         type = class_search_type(name);
109 #ifdef HAVE_MODULE_LOADING_SUPPORT
110         if (!type) {
111                 const char *modname = name;
112
113 #ifdef HAVE_SERVER_SUPPORT
114                 if (strcmp(modname, "obdfilter") == 0)
115                         modname = "ofd";
116
117                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
118                         modname = LUSTRE_OSP_NAME;
119
120                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
121                         modname = LUSTRE_MDT_NAME;
122 #endif /* HAVE_SERVER_SUPPORT */
123
124                 rcu_read_unlock();
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131                 rcu_read_lock();
132                 type = class_search_type(name);
133         }
134 #endif
135         if (type) {
136                 /*
137                  * Holding rcu_read_lock() matches the synchronize_rcu() call
138                  * in free_module() and ensures that if type->typ_dt_ops is
139                  * not yet NULL, then the module won't be freed until after
140                  * we rcu_read_unlock().
141                  */
142                 const struct obd_ops *dt_ops = READ_ONCE(type->typ_dt_ops);
143
144                 if (dt_ops && try_module_get(dt_ops->o_owner)) {
145                         atomic_inc(&type->typ_refcnt);
146                         /* class_search_type() returned a counted ref, this
147                          * count not needed as we could get it via typ_refcnt
148                          */
149                         kobject_put(&type->typ_kobj);
150                 } else {
151                         kobject_put(&type->typ_kobj);
152                         type = NULL;
153                 }
154         }
155         rcu_read_unlock();
156         return type;
157 }
158 EXPORT_SYMBOL(class_get_type);
159
160 void class_put_type(struct obd_type *type)
161 {
162         LASSERT(type);
163         module_put(type->typ_dt_ops->o_owner);
164         atomic_dec(&type->typ_refcnt);
165 }
166 EXPORT_SYMBOL(class_put_type);
167
168 static void class_sysfs_release(struct kobject *kobj)
169 {
170         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
171
172         debugfs_remove_recursive(type->typ_debugfs_entry);
173         type->typ_debugfs_entry = NULL;
174
175         if (type->typ_lu)
176                 lu_device_type_fini(type->typ_lu);
177
178 #ifdef CONFIG_PROC_FS
179         if (type->typ_name && type->typ_procroot)
180                 remove_proc_subtree(type->typ_name, proc_lustre_root);
181 #endif
182         OBD_FREE(type, sizeof(*type));
183 }
184
185 static struct kobj_type class_ktype = {
186         .sysfs_ops      = &lustre_sysfs_ops,
187         .release        = class_sysfs_release,
188 };
189
190 #ifdef HAVE_SERVER_SUPPORT
191 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
192 {
193         struct dentry *symlink;
194         struct obd_type *type;
195         int rc;
196
197         type = class_search_type(name);
198         if (type) {
199                 kobject_put(&type->typ_kobj);
200                 return ERR_PTR(-EEXIST);
201         }
202
203         OBD_ALLOC(type, sizeof(*type));
204         if (!type)
205                 return ERR_PTR(-ENOMEM);
206
207         type->typ_kobj.kset = lustre_kset;
208         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
209                                   &lustre_kset->kobj, "%s", name);
210         if (rc)
211                 return ERR_PTR(rc);
212
213         symlink = debugfs_create_dir(name, debugfs_lustre_root);
214         type->typ_debugfs_entry = symlink;
215         type->typ_sym_filter = true;
216
217         if (enable_proc) {
218                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
219                                                       NULL, NULL);
220                 if (IS_ERR(type->typ_procroot)) {
221                         CERROR("%s: can't create compat proc entry: %d\n",
222                                name, (int)PTR_ERR(type->typ_procroot));
223                         type->typ_procroot = NULL;
224                 }
225         }
226
227         return type;
228 }
229 EXPORT_SYMBOL(class_add_symlinks);
230 #endif /* HAVE_SERVER_SUPPORT */
231
232 #define CLASS_MAX_NAME 1024
233
234 int class_register_type(const struct obd_ops *dt_ops,
235                         const struct md_ops *md_ops,
236                         bool enable_proc,
237                         const char *name, struct lu_device_type *ldt)
238 {
239         struct obd_type *type;
240         int rc;
241
242         ENTRY;
243         /* sanity check */
244         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
245
246         type = class_search_type(name);
247         if (type) {
248 #ifdef HAVE_SERVER_SUPPORT
249                 if (type->typ_sym_filter)
250                         goto dir_exist;
251 #endif /* HAVE_SERVER_SUPPORT */
252                 kobject_put(&type->typ_kobj);
253                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
254                 RETURN(-EEXIST);
255         }
256
257         OBD_ALLOC(type, sizeof(*type));
258         if (type == NULL)
259                 RETURN(-ENOMEM);
260
261         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
262         type->typ_kobj.kset = lustre_kset;
263         kobject_init(&type->typ_kobj, &class_ktype);
264 #ifdef HAVE_SERVER_SUPPORT
265 dir_exist:
266 #endif /* HAVE_SERVER_SUPPORT */
267
268         type->typ_dt_ops = dt_ops;
269         type->typ_md_ops = md_ops;
270
271 #ifdef HAVE_SERVER_SUPPORT
272         if (type->typ_sym_filter) {
273                 type->typ_sym_filter = false;
274                 kobject_put(&type->typ_kobj);
275                 goto setup_ldt;
276         }
277 #endif
278 #ifdef CONFIG_PROC_FS
279         if (enable_proc && !type->typ_procroot) {
280                 type->typ_procroot = lprocfs_register(name,
281                                                       proc_lustre_root,
282                                                       NULL, type);
283                 if (IS_ERR(type->typ_procroot)) {
284                         rc = PTR_ERR(type->typ_procroot);
285                         type->typ_procroot = NULL;
286                         GOTO(failed, rc);
287                 }
288         }
289 #endif
290         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
291
292         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
293         if (rc)
294                 GOTO(failed, rc);
295 #ifdef HAVE_SERVER_SUPPORT
296 setup_ldt:
297 #endif
298         if (ldt) {
299                 rc = lu_device_type_init(ldt);
300                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
301                 wake_up_var(&type->typ_lu);
302                 if (rc)
303                         GOTO(failed, rc);
304         }
305
306         RETURN(0);
307
308 failed:
309         kobject_put(&type->typ_kobj);
310
311         RETURN(rc);
312 }
313 EXPORT_SYMBOL(class_register_type);
314
315 int class_unregister_type(const char *name)
316 {
317         struct obd_type *type = class_search_type(name);
318         int rc = 0;
319
320         ENTRY;
321         if (!type) {
322                 CERROR("unknown obd type\n");
323                 RETURN(-EINVAL);
324         }
325
326         /*
327          * Ensure that class_get_type doesn't try to get the module
328          * as it could be freed before the obd_type is released.
329          * synchronize_rcu() will be called before the module
330          * is freed.
331          */
332         type->typ_dt_ops = NULL;
333
334         if (atomic_read(&type->typ_refcnt)) {
335                 CERROR("type %s has refcount (%d)\n", name,
336                        atomic_read(&type->typ_refcnt));
337                 /* This is a bad situation, let's make the best of it */
338                 /* Remove ops, but leave the name for debugging */
339                 type->typ_md_ops = NULL;
340                 GOTO(out_put, rc = -EBUSY);
341         }
342
343         /* Put the final ref */
344         kobject_put(&type->typ_kobj);
345 out_put:
346         /* Put the ref returned by class_search_type() */
347         kobject_put(&type->typ_kobj);
348
349         RETURN(rc);
350 } /* class_unregister_type */
351 EXPORT_SYMBOL(class_unregister_type);
352
353 /**
354  * Create a new obd device.
355  *
356  * Allocate the new obd_device and initialize it.
357  *
358  * \param[in] type_name obd device type string.
359  * \param[in] name      obd device name.
360  * \param[in] uuid      obd device UUID
361  *
362  * \retval newdev         pointer to created obd_device
363  * \retval ERR_PTR(errno) on error
364  */
365 struct obd_device *class_newdev(const char *type_name, const char *name,
366                                 const char *uuid)
367 {
368         struct obd_device *newdev;
369         struct obd_type *type = NULL;
370
371         ENTRY;
372
373         if (strlen(name) >= MAX_OBD_NAME) {
374                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
375                 RETURN(ERR_PTR(-EINVAL));
376         }
377
378         type = class_get_type(type_name);
379         if (type == NULL) {
380                 CERROR("OBD: unknown type: %s\n", type_name);
381                 RETURN(ERR_PTR(-ENODEV));
382         }
383
384         newdev = obd_device_alloc();
385         if (newdev == NULL) {
386                 class_put_type(type);
387                 RETURN(ERR_PTR(-ENOMEM));
388         }
389         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
390         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
391         newdev->obd_type = type;
392         newdev->obd_minor = -1;
393
394         rwlock_init(&newdev->obd_pool_lock);
395         newdev->obd_pool_limit = 0;
396         newdev->obd_pool_slv = 0;
397
398         INIT_LIST_HEAD(&newdev->obd_exports);
399         newdev->obd_num_exports = 0;
400         newdev->obd_grant_check_threshold = 100;
401         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
402         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
403         INIT_LIST_HEAD(&newdev->obd_exports_timed);
404         INIT_LIST_HEAD(&newdev->obd_nid_stats);
405         spin_lock_init(&newdev->obd_nid_lock);
406         spin_lock_init(&newdev->obd_dev_lock);
407         mutex_init(&newdev->obd_dev_mutex);
408         spin_lock_init(&newdev->obd_osfs_lock);
409         /* newdev->obd_osfs_age must be set to a value in the distant
410          * past to guarantee a fresh statfs is fetched on mount.
411          */
412         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
413
414         /* XXX belongs in setup not attach  */
415         init_rwsem(&newdev->obd_observer_link_sem);
416         /* recovery data */
417         spin_lock_init(&newdev->obd_recovery_task_lock);
418         init_waitqueue_head(&newdev->obd_next_transno_waitq);
419         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
420         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
421         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
422         INIT_LIST_HEAD(&newdev->obd_evict_list);
423         INIT_LIST_HEAD(&newdev->obd_lwp_list);
424
425         llog_group_init(&newdev->obd_olg);
426         /* Detach drops this */
427         kref_init(&newdev->obd_refcount);
428         lu_ref_init(&newdev->obd_reference);
429         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
430
431         atomic_set(&newdev->obd_conn_inprogress, 0);
432
433         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
434
435         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
436                newdev->obd_name, newdev);
437
438         return newdev;
439 }
440
441 /**
442  * Free obd device.
443  *
444  * \param[in] obd obd_device to be freed
445  *
446  * \retval none
447  */
448 void class_free_dev(struct obd_device *obd)
449 {
450         struct obd_type *obd_type = obd->obd_type;
451
452         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
453                  "%px obd_magic %08x != %08x\n",
454                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
455         LASSERTF(obd->obd_minor == -1 || class_num2obd(obd->obd_minor) == obd,
456                  "obd %px != obd_devs[%d] %px\n",
457                  obd, obd->obd_minor, class_num2obd(obd->obd_minor));
458         LASSERTF(kref_read(&obd->obd_refcount) == 0,
459                  "obd_refcount should be 0, not %d\n",
460                  kref_read(&obd->obd_refcount));
461         LASSERT(obd_type != NULL);
462
463         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
464                obd->obd_name, obd->obd_type->typ_name);
465
466         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
467                          obd->obd_name, obd->obd_uuid.uuid);
468         if (obd->obd_stopping) {
469                 int err;
470
471                 /* If we're not stopping, we were never set up */
472                 err = obd_cleanup(obd);
473                 if (err)
474                         CERROR("Cleanup %s returned %d\n",
475                                 obd->obd_name, err);
476         }
477
478         obd_device_free(obd);
479
480         class_put_type(obd_type);
481 }
482
483 /**
484  * Unregister obd device.
485  *
486  * Remove an obd from obd_dev
487  *
488  * \param[in] new_obd obd_device to be unregistered
489  *
490  * \retval none
491  */
492 void class_unregister_device(struct obd_device *obd)
493 {
494         if (obd->obd_minor >= 0) {
495                 xa_erase(&obd_devs, obd->obd_minor);
496                 class_decref(obd, "obd_device_list", obd);
497                 obd->obd_minor = -1;
498                 atomic_dec(&obd_devs_count);
499         }
500 }
501
502 /**
503  * Register obd device.
504  *
505  * Add new_obd to obd_devs
506  *
507  * \param[in] new_obd obd_device to be registered
508  *
509  * \retval 0          success
510  * \retval -EEXIST    device with this name is registered
511  */
512 int class_register_device(struct obd_device *new_obd)
513 {
514         int rc = 0;
515         int dev_no = 0;
516
517         if (new_obd == NULL) {
518                 rc = -1;
519                 goto out;
520         }
521
522         /* obd_device waiting to be destroyed by "obd_zombie_impexp_thread" */
523         if (class_name2dev(new_obd->obd_name) != -1)
524                 obd_zombie_barrier();
525
526         if (class_name2dev(new_obd->obd_name) == -1) {
527                 class_incref(new_obd, "obd_device_list", new_obd);
528                 rc = xa_alloc(&obd_devs, &dev_no, new_obd,
529                               xa_limit_31b, GFP_ATOMIC);
530
531                 if (rc != 0)
532                         goto out;
533
534                 new_obd->obd_minor = dev_no;
535                 atomic_inc(&obd_devs_count);
536         } else {
537                 rc = -EEXIST;
538         }
539
540 out:
541         RETURN(rc);
542 }
543
544 int class_name2dev(const char *name)
545 {
546         struct obd_device *obd = NULL;
547         unsigned long dev_no = 0;
548         int ret;
549
550         if (!name)
551                 return -1;
552
553         obd_device_lock();
554         obd_device_for_each(dev_no, obd) {
555                 if (strcmp(name, obd->obd_name) == 0) {
556                         /*
557                          * Make sure we finished attaching before we give
558                          * out any references
559                          */
560                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
561                         if (obd->obd_attached) {
562                                 ret = obd->obd_minor;
563                                 obd_device_unlock();
564                                 return ret;
565                         }
566                         break;
567                 }
568         }
569         obd_device_unlock();
570
571         return -1;
572 }
573 EXPORT_SYMBOL(class_name2dev);
574
575 struct obd_device *class_name2obd(const char *name)
576 {
577         struct obd_device *obd = NULL;
578         unsigned long dev_no = 0;
579
580         if (!name)
581                 return NULL;
582
583         obd_device_lock();
584         obd_device_for_each(dev_no, obd) {
585                 if (strcmp(name, obd->obd_name) == 0) {
586                         /*
587                          * Make sure we finished attaching before we give
588                          * out any references
589                          */
590                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
591                         if (obd->obd_attached)
592                                 break;
593                 }
594         }
595         obd_device_unlock();
596
597         /*
598          * TODO: We give out a reference without class_incref(). This isn't
599          * ideal, but this behavior is identical in previous implementations
600          * of this function.
601          */
602         return obd;
603 }
604 EXPORT_SYMBOL(class_name2obd);
605
606 int class_uuid2dev(struct obd_uuid *uuid)
607 {
608         struct obd_device *obd = NULL;
609         unsigned long dev_no = 0;
610         int ret;
611
612         obd_device_lock();
613         obd_device_for_each(dev_no, obd) {
614                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
615                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
616                         ret = obd->obd_minor;
617                         obd_device_unlock();
618                         return ret;
619                 }
620         }
621         obd_device_unlock();
622
623         return -1;
624 }
625 EXPORT_SYMBOL(class_uuid2dev);
626
627 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
628 {
629         struct obd_device *obd = NULL;
630         unsigned long dev_no = 0;
631
632         obd_device_lock();
633         obd_device_for_each(dev_no, obd) {
634                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
635                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
636                         break;
637                 }
638         }
639         obd_device_unlock();
640
641         /*
642          * TODO: We give out a reference without class_incref(). This isn't
643          * ideal, but this behavior is identical in previous implementations
644          * of this function.
645          */
646         return obd;
647 }
648 EXPORT_SYMBOL(class_uuid2obd);
649
650 struct obd_device *class_num2obd(int dev_no)
651 {
652         return xa_load(&obd_devs, dev_no);
653 }
654 EXPORT_SYMBOL(class_num2obd);
655
656 /**
657  * Find obd by name or uuid.
658  *
659  * Increment obd's refcount if found.
660  *
661  * \param[in] str obd name or uuid
662  *
663  * \retval NULL    if not found
664  * \retval obd     pointer to found obd_device
665  */
666 struct obd_device *class_str2obd(const char *str)
667 {
668         struct obd_device *obd = NULL;
669         struct obd_uuid uuid;
670         unsigned long dev_no = 0;
671
672         obd_str2uuid(&uuid, str);
673
674         obd_device_lock();
675         obd_device_for_each(dev_no, obd) {
676                 if (obd_uuid_equals(&uuid, &obd->obd_uuid) ||
677                     (strcmp(str, obd->obd_name) == 0)) {
678                         /*
679                          * Make sure we finished attaching before we give
680                          * out any references
681                          */
682                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
683                         if (obd->obd_attached) {
684                                 class_incref(obd, "find", current);
685                                 break;
686                         }
687                         obd_device_unlock();
688                         RETURN(NULL);
689                 }
690         }
691         obd_device_unlock();
692
693         RETURN(obd);
694 }
695 EXPORT_SYMBOL(class_str2obd);
696
697 /**
698  * Get obd devices count. Device in any
699  *    state are counted
700  * \retval obd device count
701  */
702 int class_obd_devs_count(void)
703 {
704         return atomic_read(&obd_devs_count);
705 }
706 EXPORT_SYMBOL(class_obd_devs_count);
707
708 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
709  * specified, then only the client with that uuid is returned,
710  * otherwise any client connected to the tgt is returned.
711  */
712 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
713                                          const char *type_name,
714                                          struct obd_uuid *grp_uuid)
715 {
716         struct obd_device *obd = NULL;
717         unsigned long dev_no = 0;
718
719         obd_device_lock();
720         obd_device_for_each(dev_no, obd) {
721                 if ((strncmp(obd->obd_type->typ_name, type_name,
722                              strlen(type_name)) == 0)) {
723                         if (obd_uuid_equals(tgt_uuid,
724                                             &obd->u.cli.cl_target_uuid) &&
725                             ((grp_uuid) ? obd_uuid_equals(grp_uuid,
726                                                          &obd->obd_uuid) : 1)) {
727                                 obd_device_unlock();
728                                 return obd;
729                         }
730                 }
731         }
732         obd_device_unlock();
733
734         return NULL;
735 }
736 EXPORT_SYMBOL(class_find_client_obd);
737
738 /**
739  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
740  * adjust sptlrpc settings accordingly.
741  */
742 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
743 {
744         struct obd_device *obd = NULL;
745         unsigned long dev_no = 0;
746         const char *type;
747         int rc = 0, rc2;
748
749         LASSERT(namelen > 0);
750
751         obd_device_lock();
752         obd_device_for_each(dev_no, obd) {
753                 if (obd->obd_set_up == 0 || obd->obd_stopping)
754                         continue;
755
756                 /* only notify mdc, osc, osp, lwp, mdt, ost
757                  * because only these have a -sptlrpc llog
758                  */
759                 type = obd->obd_type->typ_name;
760                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
761                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
762                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
763                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
764                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
765                     strcmp(type, LUSTRE_OST_NAME) != 0)
766                         continue;
767
768                 if (strncmp(obd->obd_name, fsname, namelen))
769                         continue;
770
771                 class_incref(obd, __func__, obd);
772                 obd_device_unlock();
773                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
774                                          sizeof(KEY_SPTLRPC_CONF),
775                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
776                 rc = rc ? rc : rc2;
777                 obd_device_lock();
778                 class_decref(obd, __func__, obd);
779         }
780         obd_device_unlock();
781
782         return rc;
783 }
784 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
785
786 void obd_cleanup_caches(void)
787 {
788         ENTRY;
789         if (obd_device_cachep) {
790                 kmem_cache_destroy(obd_device_cachep);
791                 obd_device_cachep = NULL;
792         }
793
794         EXIT;
795 }
796
797 int obd_init_caches(void)
798 {
799         int rc;
800
801         ENTRY;
802
803         LASSERT(obd_device_cachep == NULL);
804         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
805                                 sizeof(struct obd_device),
806                                 0, 0, 0, sizeof(struct obd_device), NULL);
807         if (!obd_device_cachep)
808                 GOTO(out, rc = -ENOMEM);
809
810         RETURN(0);
811 out:
812         obd_cleanup_caches();
813         RETURN(rc);
814 }
815
816 static const char export_handle_owner[] = "export";
817
818 /* map connection to client */
819 struct obd_export *class_conn2export(struct lustre_handle *conn)
820 {
821         struct obd_export *export;
822
823         ENTRY;
824
825         if (!conn) {
826                 CDEBUG(D_CACHE, "looking for null handle\n");
827                 RETURN(NULL);
828         }
829
830         if (conn->cookie == -1) {  /* this means assign a new connection */
831                 CDEBUG(D_CACHE, "want a new connection\n");
832                 RETURN(NULL);
833         }
834
835         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
836         export = class_handle2object(conn->cookie, export_handle_owner);
837         RETURN(export);
838 }
839 EXPORT_SYMBOL(class_conn2export);
840
841 struct obd_device *class_exp2obd(struct obd_export *exp)
842 {
843         if (exp)
844                 return exp->exp_obd;
845         return NULL;
846 }
847 EXPORT_SYMBOL(class_exp2obd);
848
849 struct obd_import *class_exp2cliimp(struct obd_export *exp)
850 {
851         struct obd_device *obd = exp->exp_obd;
852
853         if (obd == NULL)
854                 return NULL;
855         return obd->u.cli.cl_import;
856 }
857 EXPORT_SYMBOL(class_exp2cliimp);
858
859 /* Export management functions */
860 static void class_export_destroy(struct obd_export *exp)
861 {
862         struct obd_device *obd = exp->exp_obd;
863
864         ENTRY;
865
866         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
867         LASSERT(obd != NULL);
868
869         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
870                exp->exp_client_uuid.uuid, obd->obd_name);
871
872         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
873         ptlrpc_connection_put(exp->exp_connection);
874
875         LASSERT(list_empty(&exp->exp_outstanding_replies));
876         LASSERT(list_empty(&exp->exp_uncommitted_replies));
877         LASSERT(list_empty(&exp->exp_req_replay_queue));
878         LASSERT(list_empty(&exp->exp_hp_rpcs));
879         obd_destroy_export(exp);
880         /* self export doesn't hold a reference to an obd, although it
881          * exists until freeing of the obd
882          */
883         if (exp != obd->obd_self_export)
884                 class_decref(obd, "export", exp);
885
886         OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
887         kfree_rcu(exp, exp_handle.h_rcu);
888         EXIT;
889 }
890
891 struct obd_export *class_export_get(struct obd_export *exp)
892 {
893         refcount_inc(&exp->exp_handle.h_ref);
894         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
895                refcount_read(&exp->exp_handle.h_ref));
896         return exp;
897 }
898 EXPORT_SYMBOL(class_export_get);
899
900 void class_export_put(struct obd_export *exp)
901 {
902         LASSERT(exp != NULL);
903         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
904         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
905         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
906                refcount_read(&exp->exp_handle.h_ref) - 1);
907
908         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
909                 struct obd_device *obd = exp->exp_obd;
910
911                 CDEBUG(D_IOCTL, "final put %p/%s\n",
912                        exp, exp->exp_client_uuid.uuid);
913
914                 /* release nid stat refererence */
915                 lprocfs_exp_cleanup(exp);
916
917                 if (exp == obd->obd_self_export) {
918                         /* self export should be destroyed without zombie
919                          * thread as it doesn't hold a reference to obd and
920                          * doesn't hold any resources
921                          */
922                         class_export_destroy(exp);
923                         /* self export is destroyed, no class ref exist and it
924                          * is safe to free obd
925                          */
926                         class_free_dev(obd);
927                 } else {
928                         LASSERT(!list_empty(&exp->exp_obd_chain));
929                         obd_zombie_export_add(exp);
930                 }
931
932         }
933 }
934 EXPORT_SYMBOL(class_export_put);
935
936 static void obd_zombie_exp_cull(struct work_struct *ws)
937 {
938         struct obd_export *export;
939
940         export = container_of(ws, struct obd_export, exp_zombie_work);
941         class_export_destroy(export);
942         LASSERT(atomic_read(&obd_stale_export_num) > 0);
943         if (atomic_dec_and_test(&obd_stale_export_num))
944                 wake_up_var(&obd_stale_export_num);
945 }
946
947 /* Creates a new export, adds it to the hash table, and returns a
948  * pointer to it. The refcount is 2: one for the hash reference, and
949  * one for the pointer returned by this function.
950  */
951 static struct obd_export *__class_new_export(struct obd_device *obd,
952                                              struct obd_uuid *cluuid,
953                                              bool is_self)
954 {
955         struct obd_export *export;
956         int rc = 0;
957
958         ENTRY;
959
960         OBD_ALLOC_PTR(export);
961         if (!export)
962                 return ERR_PTR(-ENOMEM);
963
964         export->exp_conn_cnt = 0;
965         export->exp_lock_hash = NULL;
966         export->exp_flock_hash = NULL;
967         /* 2 = class_handle_hash + last */
968         refcount_set(&export->exp_handle.h_ref, 2);
969         atomic_set(&export->exp_rpc_count, 0);
970         atomic_set(&export->exp_cb_count, 0);
971         atomic_set(&export->exp_locks_count, 0);
972 #if LUSTRE_TRACKS_LOCK_EXP_REFS
973         INIT_LIST_HEAD(&export->exp_locks_list);
974         spin_lock_init(&export->exp_locks_list_guard);
975 #endif
976         atomic_set(&export->exp_replay_count, 0);
977         export->exp_obd = obd;
978         INIT_LIST_HEAD(&export->exp_outstanding_replies);
979         spin_lock_init(&export->exp_uncommitted_replies_lock);
980         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
981         INIT_LIST_HEAD(&export->exp_req_replay_queue);
982         INIT_HLIST_NODE(&export->exp_handle.h_link);
983         INIT_LIST_HEAD(&export->exp_hp_rpcs);
984         INIT_LIST_HEAD(&export->exp_reg_rpcs);
985         class_handle_hash(&export->exp_handle, export_handle_owner);
986         export->exp_last_request_time = ktime_get_real_seconds();
987         spin_lock_init(&export->exp_lock);
988         spin_lock_init(&export->exp_rpc_lock);
989         INIT_HLIST_NODE(&export->exp_gen_hash);
990         spin_lock_init(&export->exp_bl_list_lock);
991         INIT_LIST_HEAD(&export->exp_bl_list);
992         INIT_LIST_HEAD(&export->exp_stale_list);
993         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
994
995         export->exp_sp_peer = LUSTRE_SP_ANY;
996         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
997         export->exp_client_uuid = *cluuid;
998         obd_init_export(export);
999
1000         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1001         export->exp_root_fid.f_seq = 0;
1002         export->exp_root_fid.f_oid = 0;
1003         export->exp_root_fid.f_ver = 0;
1004
1005         spin_lock(&obd->obd_dev_lock);
1006         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1007                 /* shouldn't happen, but might race */
1008                 if (obd->obd_stopping)
1009                         GOTO(exit_unlock, rc = -ENODEV);
1010
1011                 rc = obd_uuid_add(obd, export);
1012                 if (rc != 0) {
1013                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1014                                       obd->obd_name, cluuid->uuid, rc);
1015                         GOTO(exit_unlock, rc = -EALREADY);
1016                 }
1017         }
1018
1019         if (!is_self) {
1020                 class_incref(obd, "export", export);
1021                 list_add_tail(&export->exp_obd_chain_timed,
1022                               &obd->obd_exports_timed);
1023                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1024                 obd->obd_num_exports++;
1025         } else {
1026                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1027                 INIT_LIST_HEAD(&export->exp_obd_chain);
1028         }
1029         spin_unlock(&obd->obd_dev_lock);
1030         RETURN(export);
1031
1032 exit_unlock:
1033         spin_unlock(&obd->obd_dev_lock);
1034         class_handle_unhash(&export->exp_handle);
1035         obd_destroy_export(export);
1036         OBD_FREE_PTR(export);
1037         return ERR_PTR(rc);
1038 }
1039
1040 struct obd_export *class_new_export(struct obd_device *obd,
1041                                     struct obd_uuid *uuid)
1042 {
1043         return __class_new_export(obd, uuid, false);
1044 }
1045 EXPORT_SYMBOL(class_new_export);
1046
1047 struct obd_export *class_new_export_self(struct obd_device *obd,
1048                                          struct obd_uuid *uuid)
1049 {
1050         return __class_new_export(obd, uuid, true);
1051 }
1052
1053 void class_unlink_export(struct obd_export *exp)
1054 {
1055         class_handle_unhash(&exp->exp_handle);
1056
1057         if (exp->exp_obd->obd_self_export == exp) {
1058                 class_export_put(exp);
1059                 return;
1060         }
1061
1062         spin_lock(&exp->exp_obd->obd_dev_lock);
1063         /* delete an uuid-export hashitem from hashtables */
1064         if (exp != exp->exp_obd->obd_self_export)
1065                 obd_uuid_del(exp->exp_obd, exp);
1066
1067 #ifdef HAVE_SERVER_SUPPORT
1068         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1069                 struct tg_export_data   *ted = &exp->exp_target_data;
1070                 struct cfs_hash         *hash;
1071
1072                 /* Because obd_gen_hash will not be released until
1073                  * class_cleanup(), so hash should never be NULL here
1074                  */
1075                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1076                 LASSERT(hash != NULL);
1077                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1078                              &exp->exp_gen_hash);
1079                 cfs_hash_putref(hash);
1080         }
1081 #endif /* HAVE_SERVER_SUPPORT */
1082
1083         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1084         list_del_init(&exp->exp_obd_chain_timed);
1085         exp->exp_obd->obd_num_exports--;
1086         spin_unlock(&exp->exp_obd->obd_dev_lock);
1087
1088         /* A reference is kept by obd_stale_exports list */
1089         obd_stale_export_put(exp);
1090 }
1091 EXPORT_SYMBOL(class_unlink_export);
1092
1093 /* Import management functions */
1094 static void obd_zombie_import_free(struct obd_import *imp)
1095 {
1096         struct obd_import_conn *imp_conn;
1097
1098         ENTRY;
1099         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1100                imp->imp_obd->obd_name);
1101
1102         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1103
1104         ptlrpc_connection_put(imp->imp_connection);
1105
1106         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1107                                                     struct obd_import_conn,
1108                                                     oic_item)) != NULL) {
1109                 list_del_init(&imp_conn->oic_item);
1110                 ptlrpc_connection_put(imp_conn->oic_conn);
1111                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1112         }
1113
1114         LASSERT(imp->imp_sec == NULL);
1115         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1116                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1117         class_decref(imp->imp_obd, "import", imp);
1118         OBD_FREE_PTR(imp);
1119         EXIT;
1120 }
1121
1122 struct obd_import *class_import_get(struct obd_import *import)
1123 {
1124         refcount_inc(&import->imp_refcount);
1125         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1126                refcount_read(&import->imp_refcount),
1127                import->imp_obd->obd_name);
1128         return import;
1129 }
1130 EXPORT_SYMBOL(class_import_get);
1131
1132 void class_import_put(struct obd_import *imp)
1133 {
1134         ENTRY;
1135
1136         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1137
1138         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1139                refcount_read(&imp->imp_refcount) - 1,
1140                imp->imp_obd->obd_name);
1141
1142         if (refcount_dec_and_test(&imp->imp_refcount)) {
1143                 CDEBUG(D_INFO, "final put import %p\n", imp);
1144                 obd_zombie_import_add(imp);
1145         }
1146
1147         EXIT;
1148 }
1149 EXPORT_SYMBOL(class_import_put);
1150
1151 static void init_imp_at(struct imp_at *at)
1152 {
1153         int i;
1154
1155         at_init(&at->iat_net_latency, 0, 0);
1156         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1157                 /* max service estimates are tracked server side, so dont't
1158                  * use AT history here, just use the last reported val. (But
1159                  * keep hist for proc histogram, worst_ever)
1160                  */
1161                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1162                         AT_FLG_NOHIST);
1163         }
1164 }
1165
1166 static void obd_zombie_imp_cull(struct work_struct *ws)
1167 {
1168         struct obd_import *import;
1169
1170         import = container_of(ws, struct obd_import, imp_zombie_work);
1171         obd_zombie_import_free(import);
1172 }
1173
1174 struct obd_import *class_new_import(struct obd_device *obd)
1175 {
1176         struct obd_import *imp;
1177         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1178
1179         OBD_ALLOC(imp, sizeof(*imp));
1180         if (imp == NULL)
1181                 return NULL;
1182
1183         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1184         INIT_LIST_HEAD(&imp->imp_replay_list);
1185         INIT_LIST_HEAD(&imp->imp_sending_list);
1186         INIT_LIST_HEAD(&imp->imp_delayed_list);
1187         INIT_LIST_HEAD(&imp->imp_committed_list);
1188         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1189         imp->imp_known_replied_xid = 0;
1190         imp->imp_replay_cursor = &imp->imp_committed_list;
1191         spin_lock_init(&imp->imp_lock);
1192         imp->imp_last_success_conn = 0;
1193         imp->imp_state = LUSTRE_IMP_NEW;
1194         imp->imp_obd = class_incref(obd, "import", imp);
1195         rwlock_init(&imp->imp_sec_lock);
1196         init_waitqueue_head(&imp->imp_recovery_waitq);
1197         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1198
1199         if (curr_pid_ns && curr_pid_ns->child_reaper)
1200                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1201         else
1202                 imp->imp_sec_refpid = 1;
1203
1204         refcount_set(&imp->imp_refcount, 2);
1205         atomic_set(&imp->imp_unregistering, 0);
1206         atomic_set(&imp->imp_reqs, 0);
1207         atomic_set(&imp->imp_inflight, 0);
1208         atomic_set(&imp->imp_replay_inflight, 0);
1209         init_waitqueue_head(&imp->imp_replay_waitq);
1210         atomic_set(&imp->imp_inval_count, 0);
1211         atomic_set(&imp->imp_waiting, 0);
1212         INIT_LIST_HEAD(&imp->imp_conn_list);
1213         init_imp_at(&imp->imp_at);
1214
1215         /* the default magic is V2, will be used in connect RPC, and
1216          * then adjusted according to the flags in request/reply.
1217          */
1218         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1219
1220         return imp;
1221 }
1222 EXPORT_SYMBOL(class_new_import);
1223
1224 void class_destroy_import(struct obd_import *import)
1225 {
1226         LASSERT(import != NULL);
1227         LASSERT(import != LP_POISON);
1228
1229         spin_lock(&import->imp_lock);
1230         import->imp_generation++;
1231         spin_unlock(&import->imp_lock);
1232         class_import_put(import);
1233 }
1234 EXPORT_SYMBOL(class_destroy_import);
1235
1236 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1237
1238 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1239 {
1240         spin_lock(&exp->exp_locks_list_guard);
1241
1242         LASSERT(lock->l_exp_refs_nr >= 0);
1243
1244         if (lock->l_exp_refs_target != NULL &&
1245             lock->l_exp_refs_target != exp) {
1246                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1247                               exp, lock, lock->l_exp_refs_target);
1248         }
1249         if ((lock->l_exp_refs_nr++) == 0) {
1250                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1251                 lock->l_exp_refs_target = exp;
1252         }
1253         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1254                lock, exp, lock->l_exp_refs_nr);
1255         spin_unlock(&exp->exp_locks_list_guard);
1256 }
1257 EXPORT_SYMBOL(__class_export_add_lock_ref);
1258
1259 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1260 {
1261         spin_lock(&exp->exp_locks_list_guard);
1262         LASSERT(lock->l_exp_refs_nr > 0);
1263         if (lock->l_exp_refs_target != exp) {
1264                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1265                               lock, lock->l_exp_refs_target, exp);
1266         }
1267         if (-- lock->l_exp_refs_nr == 0) {
1268                 list_del_init(&lock->l_exp_refs_link);
1269                 lock->l_exp_refs_target = NULL;
1270         }
1271         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1272                lock, exp, lock->l_exp_refs_nr);
1273         spin_unlock(&exp->exp_locks_list_guard);
1274 }
1275 EXPORT_SYMBOL(__class_export_del_lock_ref);
1276 #endif
1277
1278 /* A connection defines an export context in which preallocation can be
1279  * managed. This releases the export pointer reference, and returns the export
1280  * handle, so the export refcount is 1 when this function returns.
1281  */
1282 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1283                   struct obd_uuid *cluuid)
1284 {
1285         struct obd_export *export;
1286
1287         LASSERT(conn != NULL);
1288         LASSERT(obd != NULL);
1289         LASSERT(cluuid != NULL);
1290         ENTRY;
1291
1292         export = class_new_export(obd, cluuid);
1293         if (IS_ERR(export))
1294                 RETURN(PTR_ERR(export));
1295
1296         conn->cookie = export->exp_handle.h_cookie;
1297         class_export_put(export);
1298
1299         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1300                cluuid->uuid, conn->cookie);
1301         RETURN(0);
1302 }
1303 EXPORT_SYMBOL(class_connect);
1304
1305 /* if export is involved in recovery then clean up related things */
1306 static void class_export_recovery_cleanup(struct obd_export *exp)
1307 {
1308         struct obd_device *obd = exp->exp_obd;
1309
1310         spin_lock(&obd->obd_recovery_task_lock);
1311         if (obd->obd_recovering) {
1312                 if (exp->exp_in_recovery) {
1313                         spin_lock(&exp->exp_lock);
1314                         exp->exp_in_recovery = 0;
1315                         spin_unlock(&exp->exp_lock);
1316                         LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1317                         atomic_dec(&obd->obd_connected_clients);
1318                 }
1319
1320                 /* if called during recovery then should update
1321                  * obd_stale_clients counter, lightweight exports is not counted
1322                  */
1323                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1324                         exp->exp_obd->obd_stale_clients++;
1325         }
1326         spin_unlock(&obd->obd_recovery_task_lock);
1327
1328         spin_lock(&exp->exp_lock);
1329         /** Cleanup req replay fields */
1330         if (exp->exp_req_replay_needed) {
1331                 exp->exp_req_replay_needed = 0;
1332
1333                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1334                 atomic_dec(&obd->obd_req_replay_clients);
1335         }
1336
1337         /** Cleanup lock replay data */
1338         if (exp->exp_lock_replay_needed) {
1339                 exp->exp_lock_replay_needed = 0;
1340
1341                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1342                 atomic_dec(&obd->obd_lock_replay_clients);
1343         }
1344         spin_unlock(&exp->exp_lock);
1345 }
1346
1347 /* This function removes 1-3 references from the export:
1348  * 1 - for export pointer passed
1349  * and if disconnect really need
1350  * 2 - removing from hash
1351  * 3 - in client_unlink_export
1352  * The export pointer passed to this function can destroyed
1353  */
1354 int class_disconnect(struct obd_export *export)
1355 {
1356         int already_disconnected;
1357
1358         ENTRY;
1359
1360         if (export == NULL) {
1361                 CWARN("attempting to free NULL export %p\n", export);
1362                 RETURN(-EINVAL);
1363         }
1364
1365         spin_lock(&export->exp_lock);
1366         already_disconnected = export->exp_disconnected;
1367         export->exp_disconnected = 1;
1368 #ifdef HAVE_SERVER_SUPPORT
1369         /*  We hold references of export for uuid hash and nid_hash and export
1370          *  link at least. So it is safe to call rh*table_remove_fast in there.
1371          */
1372         obd_nid_del(export->exp_obd, export);
1373 #endif /* HAVE_SERVER_SUPPORT */
1374         spin_unlock(&export->exp_lock);
1375
1376         /* class_cleanup(), abort_recovery(), and class_fail_export() all end up
1377          * here, and any of them race we shouldn't call extra class_export_puts
1378          */
1379         if (already_disconnected)
1380                 GOTO(no_disconn, already_disconnected);
1381
1382         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1383                export->exp_handle.h_cookie);
1384
1385         class_export_recovery_cleanup(export);
1386         class_unlink_export(export);
1387 no_disconn:
1388         class_export_put(export);
1389         RETURN(0);
1390 }
1391 EXPORT_SYMBOL(class_disconnect);
1392
1393 /* Return non-zero for a fully connected export */
1394 int class_connected_export(struct obd_export *exp)
1395 {
1396         int connected = 0;
1397
1398         if (exp) {
1399                 spin_lock(&exp->exp_lock);
1400                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1401                 spin_unlock(&exp->exp_lock);
1402         }
1403         return connected;
1404 }
1405 EXPORT_SYMBOL(class_connected_export);
1406
1407 static void class_disconnect_export_list(struct list_head *list,
1408                                          enum obd_option flags)
1409 {
1410         int rc;
1411         struct obd_export *exp;
1412
1413         ENTRY;
1414
1415         /* It's possible that an export may disconnect itself, but
1416          * nothing else will be added to this list.
1417          */
1418         while ((exp = list_first_entry_or_null(list, struct obd_export,
1419                                                exp_obd_chain)) != NULL) {
1420                 /* need for safe call CDEBUG after obd_disconnect */
1421                 class_export_get(exp);
1422
1423                 spin_lock(&exp->exp_lock);
1424                 exp->exp_flags = flags;
1425                 spin_unlock(&exp->exp_lock);
1426
1427                 if (obd_uuid_equals(&exp->exp_client_uuid,
1428                                     &exp->exp_obd->obd_uuid)) {
1429                         CDEBUG(D_HA,
1430                                "exp %p export uuid == obd uuid, don't discon\n",
1431                                exp);
1432                         /* Need to delete this now so we don't end up pointing
1433                          * to work_list later when this export is cleaned up.
1434                          */
1435                         list_del_init(&exp->exp_obd_chain);
1436                         class_export_put(exp);
1437                         continue;
1438                 }
1439
1440                 class_export_get(exp);
1441                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at %lld\n",
1442                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1443                        exp, exp->exp_last_request_time);
1444                 /* release one export reference anyway */
1445                 rc = obd_disconnect(exp);
1446
1447                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1448                        obd_export_nid2str(exp), exp, rc);
1449                 class_export_put(exp);
1450         }
1451         EXIT;
1452 }
1453
1454 void class_disconnect_exports(struct obd_device *obd)
1455 {
1456         LIST_HEAD(work_list);
1457
1458         ENTRY;
1459
1460         /* Move all of the exports from obd_exports to a work list, en masse. */
1461         spin_lock(&obd->obd_dev_lock);
1462         list_splice_init(&obd->obd_exports, &work_list);
1463         list_splice_init(&obd->obd_delayed_exports, &work_list);
1464         spin_unlock(&obd->obd_dev_lock);
1465
1466         if (!list_empty(&work_list)) {
1467                 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1468                        obd->obd_minor, obd);
1469                 class_disconnect_export_list(&work_list,
1470                                              exp_flags_from_obd(obd));
1471         } else
1472                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1473                        obd->obd_minor, obd);
1474         EXIT;
1475 }
1476 EXPORT_SYMBOL(class_disconnect_exports);
1477
1478 /* Remove exports that have not completed recovery.
1479  */
1480 void class_disconnect_stale_exports(struct obd_device *obd,
1481                                     int (*test_export)(struct obd_export *))
1482 {
1483         LIST_HEAD(work_list);
1484         struct obd_export *exp, *n;
1485         int evicted = 0;
1486
1487         ENTRY;
1488
1489         spin_lock(&obd->obd_dev_lock);
1490         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1491                                  exp_obd_chain) {
1492                 /* don't count self-export as client */
1493                 if (obd_uuid_equals(&exp->exp_client_uuid,
1494                                     &exp->exp_obd->obd_uuid))
1495                         continue;
1496
1497                 /* don't evict clients which have no slot in last_rcvd
1498                  * (e.g. lightweight connection)
1499                  */
1500                 if (exp->exp_target_data.ted_lr_idx == -1)
1501                         continue;
1502
1503                 spin_lock(&exp->exp_lock);
1504                 if (exp->exp_failed || test_export(exp)) {
1505                         spin_unlock(&exp->exp_lock);
1506                         continue;
1507                 }
1508                 exp->exp_failed = 1;
1509                 atomic_inc(&exp->exp_obd->obd_eviction_count);
1510                 spin_unlock(&exp->exp_lock);
1511
1512                 list_move(&exp->exp_obd_chain, &work_list);
1513                 evicted++;
1514                 CWARN("%s: disconnect stale client %s@%s\n",
1515                       obd->obd_name, exp->exp_client_uuid.uuid,
1516                       obd_export_nid2str(exp));
1517                 print_export_data(exp, "EVICTING", 0, D_HA);
1518         }
1519         spin_unlock(&obd->obd_dev_lock);
1520
1521         if (evicted)
1522                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1523                               obd->obd_name, evicted);
1524
1525         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1526                                                  OBD_OPT_ABORT_RECOV);
1527         EXIT;
1528 }
1529 EXPORT_SYMBOL(class_disconnect_stale_exports);
1530
1531 void class_fail_export(struct obd_export *exp)
1532 {
1533         int rc, already_failed;
1534
1535         spin_lock(&exp->exp_lock);
1536         already_failed = exp->exp_failed;
1537         exp->exp_failed = 1;
1538         spin_unlock(&exp->exp_lock);
1539
1540         if (already_failed) {
1541                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1542                        exp, exp->exp_client_uuid.uuid);
1543                 return;
1544         }
1545
1546         atomic_inc(&exp->exp_obd->obd_eviction_count);
1547
1548         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1549                exp, exp->exp_client_uuid.uuid);
1550
1551         if (obd_dump_on_timeout)
1552                 libcfs_debug_dumplog();
1553
1554         /* need for safe call CDEBUG after obd_disconnect */
1555         class_export_get(exp);
1556
1557         /* Callers into obd_disconnect are removing their own ref(eg request) in
1558          * addition to one from hash table. We don't have such a ref so make one
1559          */
1560         class_export_get(exp);
1561         rc = obd_disconnect(exp);
1562         if (rc)
1563                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1564         else
1565                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1566                        exp, exp->exp_client_uuid.uuid);
1567         class_export_put(exp);
1568 }
1569 EXPORT_SYMBOL(class_fail_export);
1570
1571 #ifdef HAVE_SERVER_SUPPORT
1572
1573 static int take_first(struct obd_export *exp, void *data)
1574 {
1575         struct obd_export **expp = data;
1576
1577         if (*expp)
1578                 /* already have one */
1579                 return 0;
1580         if (exp->exp_failed)
1581                 /* Don't want this one */
1582                 return 0;
1583         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1584                 /* Cannot get a ref on this one */
1585                 return 0;
1586         *expp = exp;
1587         return 1;
1588 }
1589
1590 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1591 {
1592         struct lnet_nid nid_key;
1593         struct obd_export *doomed_exp;
1594         int exports_evicted = 0;
1595
1596         libcfs_strnid(&nid_key, nid);
1597
1598         spin_lock(&obd->obd_dev_lock);
1599         /* umount already run. evict thread should stop leaving unmount thread
1600          * to take over
1601          */
1602         if (obd->obd_stopping) {
1603                 spin_unlock(&obd->obd_dev_lock);
1604                 return exports_evicted;
1605         }
1606         spin_unlock(&obd->obd_dev_lock);
1607
1608         doomed_exp = NULL;
1609         while (obd_nid_export_for_each(obd, &nid_key,
1610                                        take_first, &doomed_exp) > 0) {
1611
1612                 LASSERTF(doomed_exp != obd->obd_self_export,
1613                          "self-export is hashed by NID?\n");
1614
1615                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1616                               obd->obd_name,
1617                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1618                               obd_export_nid2str(doomed_exp));
1619
1620                 class_fail_export(doomed_exp);
1621                 class_export_put(doomed_exp);
1622                 exports_evicted++;
1623                 doomed_exp = NULL;
1624         }
1625
1626         if (!exports_evicted)
1627                 CDEBUG(D_HA,
1628                        "%s: can't disconnect NID '%s': no exports found\n",
1629                        obd->obd_name, nid);
1630         return exports_evicted;
1631 }
1632 EXPORT_SYMBOL(obd_export_evict_by_nid);
1633
1634 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1635 {
1636         struct obd_export *doomed_exp = NULL;
1637         struct obd_uuid doomed_uuid;
1638         int exports_evicted = 0;
1639
1640         spin_lock(&obd->obd_dev_lock);
1641         if (obd->obd_stopping) {
1642                 spin_unlock(&obd->obd_dev_lock);
1643                 return exports_evicted;
1644         }
1645         spin_unlock(&obd->obd_dev_lock);
1646
1647         obd_str2uuid(&doomed_uuid, uuid);
1648         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1649                 CERROR("%s: can't evict myself\n", obd->obd_name);
1650                 return exports_evicted;
1651         }
1652
1653         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1654         if (doomed_exp == NULL) {
1655                 CERROR("%s: can't disconnect %s: no exports found\n",
1656                        obd->obd_name, uuid);
1657         } else {
1658                 CWARN("%s: evicting %s at adminstrative request\n",
1659                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1660                 class_fail_export(doomed_exp);
1661                 class_export_put(doomed_exp);
1662                 obd_uuid_del(obd, doomed_exp);
1663                 exports_evicted++;
1664         }
1665
1666         return exports_evicted;
1667 }
1668 #endif /* HAVE_SERVER_SUPPORT */
1669
1670 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1671 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1672 EXPORT_SYMBOL(class_export_dump_hook);
1673 #endif
1674
1675 static void print_export_data(struct obd_export *exp, const char *status,
1676                               int locks, int debug_level)
1677 {
1678         struct ptlrpc_reply_state *rs;
1679         struct ptlrpc_reply_state *first_reply = NULL;
1680         int nreplies = 0;
1681
1682         spin_lock(&exp->exp_lock);
1683         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1684                             rs_exp_list) {
1685                 if (nreplies == 0)
1686                         first_reply = rs;
1687                 nreplies++;
1688         }
1689         spin_unlock(&exp->exp_lock);
1690
1691         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu stale:%d\n",
1692                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1693                obd_export_nid2str(exp),
1694                refcount_read(&exp->exp_handle.h_ref),
1695                atomic_read(&exp->exp_rpc_count),
1696                atomic_read(&exp->exp_cb_count),
1697                atomic_read(&exp->exp_locks_count),
1698                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1699                nreplies, first_reply, nreplies > 3 ? "..." : "",
1700                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1701 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1702         if (locks && class_export_dump_hook != NULL)
1703                 class_export_dump_hook(exp);
1704 #endif
1705 }
1706
1707 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1708 {
1709         struct obd_export *exp;
1710
1711         spin_lock(&obd->obd_dev_lock);
1712         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1713                 print_export_data(exp, "ACTIVE", locks, debug_level);
1714         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1715                 print_export_data(exp, "UNLINKED", locks, debug_level);
1716         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1717                 print_export_data(exp, "DELAYED", locks, debug_level);
1718         spin_unlock(&obd->obd_dev_lock);
1719 }
1720
1721 void obd_exports_barrier(struct obd_device *obd)
1722 {
1723         int waited = 2;
1724
1725         LASSERT(list_empty(&obd->obd_exports));
1726         spin_lock(&obd->obd_dev_lock);
1727         while (!list_empty(&obd->obd_unlinked_exports)) {
1728                 spin_unlock(&obd->obd_dev_lock);
1729                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1730                 if (waited > 5 && is_power_of_2(waited)) {
1731                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1732                                       obd->obd_name, waited,
1733                                       kref_read(&obd->obd_refcount));
1734                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1735                 }
1736                 waited *= 2;
1737                 spin_lock(&obd->obd_dev_lock);
1738         }
1739         spin_unlock(&obd->obd_dev_lock);
1740 }
1741 EXPORT_SYMBOL(obd_exports_barrier);
1742
1743 /* Add export to the obd_zombe thread and notify it. */
1744 static void obd_zombie_export_add(struct obd_export *exp)
1745 {
1746         atomic_inc(&obd_stale_export_num);
1747         spin_lock(&exp->exp_obd->obd_dev_lock);
1748         LASSERT(!list_empty(&exp->exp_obd_chain));
1749         list_del_init(&exp->exp_obd_chain);
1750         spin_unlock(&exp->exp_obd->obd_dev_lock);
1751         queue_work(zombie_wq, &exp->exp_zombie_work);
1752 }
1753
1754 /* Add import to the obd_zombe thread and notify it. */
1755 static void obd_zombie_import_add(struct obd_import *imp)
1756 {
1757         LASSERT(imp->imp_sec == NULL);
1758
1759         queue_work(zombie_wq, &imp->imp_zombie_work);
1760 }
1761
1762 /* wait when obd_zombie import/export queues become empty */
1763 void obd_zombie_barrier(void)
1764 {
1765         wait_var_event(&obd_stale_export_num,
1766                         atomic_read(&obd_stale_export_num) == 0);
1767         flush_workqueue(zombie_wq);
1768 }
1769 EXPORT_SYMBOL(obd_zombie_barrier);
1770
1771
1772 struct obd_export *obd_stale_export_get(void)
1773 {
1774         struct obd_export *exp = NULL;
1775
1776         ENTRY;
1777
1778         spin_lock(&obd_stale_export_lock);
1779         if (!list_empty(&obd_stale_exports)) {
1780                 exp = list_first_entry(&obd_stale_exports,
1781                                        struct obd_export, exp_stale_list);
1782                 list_del_init(&exp->exp_stale_list);
1783         }
1784         spin_unlock(&obd_stale_export_lock);
1785
1786         if (exp) {
1787                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1788                        atomic_read(&obd_stale_export_num));
1789         }
1790         RETURN(exp);
1791 }
1792 EXPORT_SYMBOL(obd_stale_export_get);
1793
1794 void obd_stale_export_put(struct obd_export *exp)
1795 {
1796         ENTRY;
1797
1798         LASSERT(list_empty(&exp->exp_stale_list));
1799         if (exp->exp_lock_hash &&
1800             atomic_read(&exp->exp_lock_hash->hs_count)) {
1801                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1802                        atomic_read(&obd_stale_export_num));
1803
1804                 spin_lock_bh(&exp->exp_bl_list_lock);
1805                 spin_lock(&obd_stale_export_lock);
1806                 /* Add to the tail if there is no blocked locks,
1807                  * to the head otherwise.
1808                  */
1809                 if (list_empty(&exp->exp_bl_list))
1810                         list_add_tail(&exp->exp_stale_list,
1811                                       &obd_stale_exports);
1812                 else
1813                         list_add(&exp->exp_stale_list,
1814                                  &obd_stale_exports);
1815
1816                 spin_unlock(&obd_stale_export_lock);
1817                 spin_unlock_bh(&exp->exp_bl_list_lock);
1818         } else {
1819                 class_export_put(exp);
1820         }
1821         EXIT;
1822 }
1823 EXPORT_SYMBOL(obd_stale_export_put);
1824
1825 /**
1826  * Adjust the position of the export in the stale list,
1827  * i.e. move to the head of the list if is needed.
1828  **/
1829 void obd_stale_export_adjust(struct obd_export *exp)
1830 {
1831         LASSERT(exp != NULL);
1832         spin_lock_bh(&exp->exp_bl_list_lock);
1833         spin_lock(&obd_stale_export_lock);
1834
1835         if (!list_empty(&exp->exp_stale_list) &&
1836             !list_empty(&exp->exp_bl_list))
1837                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1838
1839         spin_unlock(&obd_stale_export_lock);
1840         spin_unlock_bh(&exp->exp_bl_list_lock);
1841 }
1842 EXPORT_SYMBOL(obd_stale_export_adjust);
1843
1844 /* start destroy zombie import/export thread */
1845 int obd_zombie_impexp_init(void)
1846 {
1847         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1848                                            0, CFS_CPT_ANY,
1849                                            cfs_cpt_number(cfs_cpt_tab));
1850
1851         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1852 }
1853
1854 /* stop destroy zombie import/export thread */
1855 void obd_zombie_impexp_stop(void)
1856 {
1857         destroy_workqueue(zombie_wq);
1858         LASSERT(list_empty(&obd_stale_exports));
1859 }
1860
1861 /***** Kernel-userspace comm helpers *******/
1862
1863 /* Get length of entire message, including header */
1864 int kuc_len(int payload_len)
1865 {
1866         return sizeof(struct kuc_hdr) + payload_len;
1867 }
1868 EXPORT_SYMBOL(kuc_len);
1869
1870 /* Get a pointer to kuc header, given a ptr to the payload
1871  * @param p Pointer to payload area
1872  * @returns Pointer to kuc header
1873  */
1874 struct kuc_hdr *kuc_ptr(void *p)
1875 {
1876         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1877
1878         LASSERT(lh->kuc_magic == KUC_MAGIC);
1879         return lh;
1880 }
1881 EXPORT_SYMBOL(kuc_ptr);
1882
1883 /* Alloc space for a message, and fill in header
1884  * @return Pointer to payload area
1885  */
1886 void *kuc_alloc(int payload_len, int transport, int type)
1887 {
1888         struct kuc_hdr *lh;
1889         int len = kuc_len(payload_len);
1890
1891         OBD_ALLOC(lh, len);
1892         if (lh == NULL)
1893                 return ERR_PTR(-ENOMEM);
1894
1895         lh->kuc_magic = KUC_MAGIC;
1896         lh->kuc_transport = transport;
1897         lh->kuc_msgtype = type;
1898         lh->kuc_msglen = len;
1899
1900         return (void *)(lh + 1);
1901 }
1902 EXPORT_SYMBOL(kuc_alloc);
1903
1904 /* Takes pointer to payload area */
1905 void kuc_free(void *p, int payload_len)
1906 {
1907         struct kuc_hdr *lh = kuc_ptr(p);
1908
1909         OBD_FREE(lh, kuc_len(payload_len));
1910 }
1911 EXPORT_SYMBOL(kuc_free);
1912
1913 struct obd_request_slot_waiter {
1914         struct list_head        orsw_entry;
1915         wait_queue_head_t       orsw_waitq;
1916         bool                    orsw_signaled;
1917 };
1918
1919 static bool obd_request_slot_avail(struct client_obd *cli,
1920                                    struct obd_request_slot_waiter *orsw)
1921 {
1922         bool avail;
1923
1924         spin_lock(&cli->cl_loi_list_lock);
1925         avail = !!list_empty(&orsw->orsw_entry);
1926         spin_unlock(&cli->cl_loi_list_lock);
1927
1928         return avail;
1929 };
1930
1931 /*
1932  * For network flow control, the RPC sponsor needs to acquire a credit
1933  * before sending the RPC. The credits count for a connection is defined
1934  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1935  * the subsequent RPC sponsors need to wait until others released their
1936  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1937  */
1938 int obd_get_request_slot(struct client_obd *cli)
1939 {
1940         struct obd_request_slot_waiter   orsw;
1941         int                              rc;
1942
1943         spin_lock(&cli->cl_loi_list_lock);
1944         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1945                 cli->cl_rpcs_in_flight++;
1946                 spin_unlock(&cli->cl_loi_list_lock);
1947                 return 0;
1948         }
1949
1950         init_waitqueue_head(&orsw.orsw_waitq);
1951         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1952         orsw.orsw_signaled = false;
1953         spin_unlock(&cli->cl_loi_list_lock);
1954
1955         rc = l_wait_event_abortable(orsw.orsw_waitq,
1956                                     obd_request_slot_avail(cli, &orsw) ||
1957                                     orsw.orsw_signaled);
1958
1959         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1960          * freed but other (such as obd_put_request_slot) is using it.
1961          */
1962         spin_lock(&cli->cl_loi_list_lock);
1963         if (rc != 0) {
1964                 if (!orsw.orsw_signaled) {
1965                         if (list_empty(&orsw.orsw_entry))
1966                                 cli->cl_rpcs_in_flight--;
1967                         else
1968                                 list_del(&orsw.orsw_entry);
1969                 }
1970                 rc = -EINTR;
1971         }
1972
1973         if (orsw.orsw_signaled) {
1974                 LASSERT(list_empty(&orsw.orsw_entry));
1975
1976                 rc = -EINTR;
1977         }
1978         spin_unlock(&cli->cl_loi_list_lock);
1979
1980         return rc;
1981 }
1982 EXPORT_SYMBOL(obd_get_request_slot);
1983
1984 void obd_put_request_slot(struct client_obd *cli)
1985 {
1986         struct obd_request_slot_waiter *orsw;
1987
1988         spin_lock(&cli->cl_loi_list_lock);
1989         cli->cl_rpcs_in_flight--;
1990
1991         /* If there is free slot, wakeup the first waiter. */
1992         if (!list_empty(&cli->cl_flight_waiters) &&
1993             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
1994                 orsw = list_first_entry(&cli->cl_flight_waiters,
1995                                         struct obd_request_slot_waiter,
1996                                         orsw_entry);
1997                 list_del_init(&orsw->orsw_entry);
1998                 cli->cl_rpcs_in_flight++;
1999                 wake_up(&orsw->orsw_waitq);
2000         }
2001         spin_unlock(&cli->cl_loi_list_lock);
2002 }
2003 EXPORT_SYMBOL(obd_put_request_slot);
2004
2005 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2006 {
2007         return cli->cl_max_rpcs_in_flight;
2008 }
2009 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2010
2011 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2012 {
2013         struct obd_request_slot_waiter *orsw;
2014         __u32                           old;
2015         int                             diff;
2016         int                             i;
2017         int                             rc;
2018
2019         if (max > OBD_MAX_RIF_MAX || max < 1)
2020                 return -ERANGE;
2021
2022         CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
2023                cli->cl_import->imp_obd->obd_name, max,
2024                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2025
2026         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2027                    LUSTRE_MDC_NAME) == 0) {
2028                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2029                  * strictly lower that max_rpcs_in_flight
2030                  */
2031                 if (max < 2) {
2032                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2033                                cli->cl_import->imp_obd->obd_name);
2034                         return -ERANGE;
2035                 }
2036                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2037                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2038                         if (rc != 0)
2039                                 return rc;
2040                 }
2041         }
2042
2043         spin_lock(&cli->cl_loi_list_lock);
2044         old = cli->cl_max_rpcs_in_flight;
2045         cli->cl_max_rpcs_in_flight = max;
2046         client_adjust_max_dirty(cli);
2047
2048         diff = max - old;
2049
2050         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2051         for (i = 0; i < diff; i++) {
2052                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2053                                                 struct obd_request_slot_waiter,
2054                                                 orsw_entry);
2055                 if (!orsw)
2056                         break;
2057
2058                 list_del_init(&orsw->orsw_entry);
2059                 cli->cl_rpcs_in_flight++;
2060                 wake_up(&orsw->orsw_waitq);
2061         }
2062         spin_unlock(&cli->cl_loi_list_lock);
2063
2064         return 0;
2065 }
2066 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2067
2068 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2069 {
2070         return cli->cl_max_mod_rpcs_in_flight;
2071 }
2072 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2073
2074 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2075 {
2076         struct obd_connect_data *ocd;
2077         __u16 maxmodrpcs;
2078         __u16 prev;
2079
2080         if (max > OBD_MAX_RIF_MAX || max < 1)
2081                 return -ERANGE;
2082
2083         ocd = &cli->cl_import->imp_connect_data;
2084         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2085                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2086                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2087
2088         if (max == OBD_MAX_RIF_MAX)
2089                 max = OBD_MAX_RIF_MAX - 1;
2090
2091         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2092          * increase this value, also bump up max_rpcs_in_flight to match.
2093          */
2094         if (max >= cli->cl_max_rpcs_in_flight) {
2095                 CDEBUG(D_INFO,
2096                        "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2097                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2098                 obd_set_max_rpcs_in_flight(cli, max + 1);
2099         }
2100
2101         /* cannot exceed max modify RPCs in flight supported by the server,
2102          * but verify ocd_connect_flags is at least initialized first.  If
2103          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2104          */
2105         if (!ocd->ocd_connect_flags) {
2106                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2107         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2108                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2109                 if (maxmodrpcs == 0) { /* connection not finished yet */
2110                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2111                         CDEBUG(D_INFO,
2112                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2113                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2114                 }
2115         } else {
2116                 maxmodrpcs = 1;
2117         }
2118         if (max > maxmodrpcs) {
2119                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2120                        cli->cl_import->imp_obd->obd_name,
2121                        max, maxmodrpcs);
2122                 return -ERANGE;
2123         }
2124
2125         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2126
2127         prev = cli->cl_max_mod_rpcs_in_flight;
2128         cli->cl_max_mod_rpcs_in_flight = max;
2129
2130         /* wakeup waiters if limit has been increased */
2131         if (cli->cl_max_mod_rpcs_in_flight > prev)
2132                 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2133
2134         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2135
2136         return 0;
2137 }
2138 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2139
2140 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2141                                struct seq_file *seq)
2142 {
2143         unsigned long mod_tot = 0, mod_cum;
2144         int i;
2145
2146         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2147         lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2148                              ":", true, "");
2149         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2150                    cli->cl_mod_rpcs_in_flight);
2151
2152         seq_puts(seq, "\n\t\t\tmodify\n");
2153         seq_puts(seq, "rpcs in flight        rpcs   %% cum %%\n");
2154
2155         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2156
2157         mod_cum = 0;
2158         for (i = 0; i < OBD_HIST_MAX; i++) {
2159                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2160
2161                 mod_cum += mod;
2162                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2163                            i, mod, pct(mod, mod_tot),
2164                            pct(mod_cum, mod_tot));
2165                 if (mod_cum == mod_tot)
2166                         break;
2167         }
2168
2169         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2170
2171         return 0;
2172 }
2173 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2174
2175 /* The number of modify RPCs sent in parallel is limited
2176  * because the server has a finite number of slots per client to
2177  * store request result and ensure reply reconstruction when needed.
2178  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2179  * that takes into account server limit and cl_max_rpcs_in_flight
2180  * value.
2181  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2182  * one close request is allowed above the maximum.
2183  */
2184 struct mod_waiter {
2185         struct client_obd *cli;
2186         bool close_req;
2187         bool woken;
2188         wait_queue_entry_t wqe;
2189 };
2190 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2191                                   unsigned int mode, int flags, void *key)
2192 {
2193         struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2194         struct client_obd *cli = w->cli;
2195         bool close_req = w->close_req;
2196         bool avail;
2197         int ret;
2198
2199         /* As woken_wake_function() doesn't remove us from the wait_queue,
2200          * we use own flag to ensure we're called just once.
2201          */
2202         if (w->woken)
2203                 return 0;
2204
2205         /* A slot is available if
2206          * - number of modify RPCs in flight is less than the max
2207          * - it's a close RPC and no other close request is in flight
2208          */
2209         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2210                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2211         if (avail) {
2212                 cli->cl_mod_rpcs_in_flight++;
2213                 if (close_req)
2214                         cli->cl_close_rpcs_in_flight++;
2215                 ret = woken_wake_function(wq_entry, mode, flags, key);
2216                 w->woken = true;
2217         } else if (cli->cl_close_rpcs_in_flight)
2218                 /* No other waiter could be woken */
2219                 ret = -1;
2220         else if (!key)
2221                 /* This was not a wakeup from a close completion or a new close
2222                  * being queued, so there is no point seeing if there are close
2223                  * waiters to be woken.
2224                  */
2225                 ret = -1;
2226         else
2227                 /* There might be be a close we could wake, keep looking */
2228                 ret = 0;
2229         return ret;
2230 }
2231
2232 /* Get a modify RPC slot from the obd client @cli according
2233  * to the kind of operation @opc that is going to be sent
2234  * and the intent @it of the operation if it applies.
2235  * If the maximum number of modify RPCs in flight is reached
2236  * the thread is put to sleep.
2237  * Returns the tag to be set in the request message. Tag 0
2238  * is reserved for non-modifying requests.
2239  */
2240 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2241 {
2242         struct mod_waiter wait = {
2243                 .cli = cli,
2244                 .close_req = (opc == MDS_CLOSE),
2245                 .woken = false,
2246         };
2247         __u16                   i, max;
2248
2249         init_wait(&wait.wqe);
2250         wait.wqe.func = claim_mod_rpc_function;
2251
2252         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2253         __add_wait_queue_entry_tail(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2254         /* This wakeup will only succeed if the maximums haven't
2255          * been reached.  If that happens, wait.woken will be set
2256          * and there will be no need to wait.
2257          * If a close_req was enqueue, ensure we search all the way to the
2258          * end of the waitqueue for a close request.
2259          */
2260         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2261                              (void*)wait.close_req);
2262
2263         while (wait.woken == false) {
2264                 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2265                 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2266                            MAX_SCHEDULE_TIMEOUT);
2267                 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2268         }
2269         __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2270
2271         max = cli->cl_max_mod_rpcs_in_flight;
2272         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2273                          cli->cl_mod_rpcs_in_flight);
2274         /* find a free tag */
2275         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2276                                 max + 1);
2277         LASSERT(i < OBD_MAX_RIF_MAX);
2278         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2279         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2280         /* tag 0 is reserved for non-modify RPCs */
2281
2282         CDEBUG(D_RPCTRACE,
2283                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2284                cli->cl_import->imp_obd->obd_name,
2285                i + 1, opc, max);
2286
2287         return i + 1;
2288 }
2289 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2290
2291 /* Put a modify RPC slot from the obd client @cli according
2292  * to the kind of operation @opc that has been sent.
2293  */
2294 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2295 {
2296         bool                    close_req = false;
2297
2298         if (tag == 0)
2299                 return;
2300
2301         if (opc == MDS_CLOSE)
2302                 close_req = true;
2303
2304         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2305         cli->cl_mod_rpcs_in_flight--;
2306         if (close_req)
2307                 cli->cl_close_rpcs_in_flight--;
2308         /* release the tag in the bitmap */
2309         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2310         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2311         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2312                              (void *)close_req);
2313         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2314 }
2315 EXPORT_SYMBOL(obd_put_mod_rpc_slot);