Whamcloud - gitweb
5f3eddeb2fb5a4187a1382bb2904b489a3bbe121
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
50
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
54
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58                               const char *status, int locks, int debug_level);
59
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
63
64 /*
65  * support functions: we could use inter-module communication, but this
66  * is more portable to other OS's
67  */
68 static struct obd_device *obd_device_alloc(void)
69 {
70         struct obd_device *obd;
71
72         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
73         if (obd != NULL) {
74                 obd->obd_magic = OBD_DEVICE_MAGIC;
75         }
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct kobject *kobj = kset_find_obj(lustre_kset, name);
96
97         if (kobj && kobj->ktype == &class_ktype)
98                 return container_of(kobj, struct obd_type, typ_kobj);
99
100         kobject_put(kobj);
101         return NULL;
102 }
103 EXPORT_SYMBOL(class_search_type);
104
105 struct obd_type *class_get_type(const char *name)
106 {
107         struct obd_type *type;
108
109         type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
111         if (!type) {
112                 const char *modname = name;
113
114 #ifdef HAVE_SERVER_SUPPORT
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
124
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 if (try_module_get(type->typ_dt_ops->o_owner)) {
136                         atomic_inc(&type->typ_refcnt);
137                         /* class_search_type() returned a counted reference,
138                          * but we don't need that count any more as
139                          * we have one through typ_refcnt.
140                          */
141                         kobject_put(&type->typ_kobj);
142                 } else {
143                         kobject_put(&type->typ_kobj);
144                         type = NULL;
145                 }
146         }
147         return type;
148 }
149
150 void class_put_type(struct obd_type *type)
151 {
152         LASSERT(type);
153         module_put(type->typ_dt_ops->o_owner);
154         atomic_dec(&type->typ_refcnt);
155 }
156
157 static void class_sysfs_release(struct kobject *kobj)
158 {
159         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
160
161         debugfs_remove_recursive(type->typ_debugfs_entry);
162         type->typ_debugfs_entry = NULL;
163
164         if (type->typ_lu)
165                 lu_device_type_fini(type->typ_lu);
166
167 #ifdef CONFIG_PROC_FS
168         if (type->typ_name && type->typ_procroot)
169                 remove_proc_subtree(type->typ_name, proc_lustre_root);
170 #endif
171         OBD_FREE(type, sizeof(*type));
172 }
173
174 static struct kobj_type class_ktype = {
175         .sysfs_ops      = &lustre_sysfs_ops,
176         .release        = class_sysfs_release,
177 };
178
179 #ifdef HAVE_SERVER_SUPPORT
180 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
181 {
182         struct dentry *symlink;
183         struct obd_type *type;
184         int rc;
185
186         type = class_search_type(name);
187         if (type) {
188                 kobject_put(&type->typ_kobj);
189                 return ERR_PTR(-EEXIST);
190         }
191
192         OBD_ALLOC(type, sizeof(*type));
193         if (!type)
194                 return ERR_PTR(-ENOMEM);
195
196         type->typ_kobj.kset = lustre_kset;
197         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
198                                   &lustre_kset->kobj, "%s", name);
199         if (rc)
200                 return ERR_PTR(rc);
201
202         symlink = debugfs_create_dir(name, debugfs_lustre_root);
203         type->typ_debugfs_entry = symlink;
204         type->typ_sym_filter = true;
205
206         if (enable_proc) {
207                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
208                                                       NULL, NULL);
209                 if (IS_ERR(type->typ_procroot)) {
210                         CERROR("%s: can't create compat proc entry: %d\n",
211                                name, (int)PTR_ERR(type->typ_procroot));
212                         type->typ_procroot = NULL;
213                 }
214         }
215
216         return type;
217 }
218 EXPORT_SYMBOL(class_add_symlinks);
219 #endif /* HAVE_SERVER_SUPPORT */
220
221 #define CLASS_MAX_NAME 1024
222
223 int class_register_type(const struct obd_ops *dt_ops,
224                         const struct md_ops *md_ops,
225                         bool enable_proc,
226                         const char *name, struct lu_device_type *ldt)
227 {
228         struct obd_type *type;
229         int rc;
230
231         ENTRY;
232         /* sanity check */
233         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
234
235         type = class_search_type(name);
236         if (type) {
237 #ifdef HAVE_SERVER_SUPPORT
238                 if (type->typ_sym_filter)
239                         goto dir_exist;
240 #endif /* HAVE_SERVER_SUPPORT */
241                 kobject_put(&type->typ_kobj);
242                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
243                 RETURN(-EEXIST);
244         }
245
246         OBD_ALLOC(type, sizeof(*type));
247         if (type == NULL)
248                 RETURN(-ENOMEM);
249
250         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
251         type->typ_kobj.kset = lustre_kset;
252         kobject_init(&type->typ_kobj, &class_ktype);
253 #ifdef HAVE_SERVER_SUPPORT
254 dir_exist:
255 #endif /* HAVE_SERVER_SUPPORT */
256
257         type->typ_dt_ops = dt_ops;
258         type->typ_md_ops = md_ops;
259
260 #ifdef HAVE_SERVER_SUPPORT
261         if (type->typ_sym_filter) {
262                 type->typ_sym_filter = false;
263                 kobject_put(&type->typ_kobj);
264                 goto setup_ldt;
265         }
266 #endif
267 #ifdef CONFIG_PROC_FS
268         if (enable_proc && !type->typ_procroot) {
269                 type->typ_procroot = lprocfs_register(name,
270                                                       proc_lustre_root,
271                                                       NULL, type);
272                 if (IS_ERR(type->typ_procroot)) {
273                         rc = PTR_ERR(type->typ_procroot);
274                         type->typ_procroot = NULL;
275                         GOTO(failed, rc);
276                 }
277         }
278 #endif
279         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
280
281         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
282         if (rc)
283                 GOTO(failed, rc);
284 #ifdef HAVE_SERVER_SUPPORT
285 setup_ldt:
286 #endif
287         if (ldt) {
288                 rc = lu_device_type_init(ldt);
289                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
290                 wake_up_var(&type->typ_lu);
291                 if (rc)
292                         GOTO(failed, rc);
293         }
294
295         RETURN(0);
296
297 failed:
298         kobject_put(&type->typ_kobj);
299
300         RETURN(rc);
301 }
302 EXPORT_SYMBOL(class_register_type);
303
304 int class_unregister_type(const char *name)
305 {
306         struct obd_type *type = class_search_type(name);
307         int rc = 0;
308         ENTRY;
309
310         if (!type) {
311                 CERROR("unknown obd type\n");
312                 RETURN(-EINVAL);
313         }
314
315         if (atomic_read(&type->typ_refcnt)) {
316                 CERROR("type %s has refcount (%d)\n", name,
317                        atomic_read(&type->typ_refcnt));
318                 /* This is a bad situation, let's make the best of it */
319                 /* Remove ops, but leave the name for debugging */
320                 type->typ_dt_ops = NULL;
321                 type->typ_md_ops = NULL;
322                 GOTO(out_put, rc = -EBUSY);
323         }
324
325         /* Put the final ref */
326         kobject_put(&type->typ_kobj);
327 out_put:
328         /* Put the ref returned by class_search_type() */
329         kobject_put(&type->typ_kobj);
330
331         RETURN(rc);
332 } /* class_unregister_type */
333 EXPORT_SYMBOL(class_unregister_type);
334
335 /**
336  * Create a new obd device.
337  *
338  * Allocate the new obd_device and initialize it.
339  *
340  * \param[in] type_name obd device type string.
341  * \param[in] name      obd device name.
342  * \param[in] uuid      obd device UUID
343  *
344  * \retval newdev         pointer to created obd_device
345  * \retval ERR_PTR(errno) on error
346  */
347 struct obd_device *class_newdev(const char *type_name, const char *name,
348                                 const char *uuid)
349 {
350         struct obd_device *newdev;
351         struct obd_type *type = NULL;
352         ENTRY;
353
354         if (strlen(name) >= MAX_OBD_NAME) {
355                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
356                 RETURN(ERR_PTR(-EINVAL));
357         }
358
359         type = class_get_type(type_name);
360         if (type == NULL){
361                 CERROR("OBD: unknown type: %s\n", type_name);
362                 RETURN(ERR_PTR(-ENODEV));
363         }
364
365         newdev = obd_device_alloc();
366         if (newdev == NULL) {
367                 class_put_type(type);
368                 RETURN(ERR_PTR(-ENOMEM));
369         }
370         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
371         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
372         newdev->obd_type = type;
373         newdev->obd_minor = -1;
374
375         rwlock_init(&newdev->obd_pool_lock);
376         newdev->obd_pool_limit = 0;
377         newdev->obd_pool_slv = 0;
378
379         INIT_LIST_HEAD(&newdev->obd_exports);
380         newdev->obd_num_exports = 0;
381         newdev->obd_grant_check_threshold = 100;
382         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
383         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
384         INIT_LIST_HEAD(&newdev->obd_exports_timed);
385         INIT_LIST_HEAD(&newdev->obd_nid_stats);
386         spin_lock_init(&newdev->obd_nid_lock);
387         spin_lock_init(&newdev->obd_dev_lock);
388         mutex_init(&newdev->obd_dev_mutex);
389         spin_lock_init(&newdev->obd_osfs_lock);
390         /* newdev->obd_osfs_age must be set to a value in the distant
391          * past to guarantee a fresh statfs is fetched on mount. */
392         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
393
394         /* XXX belongs in setup not attach  */
395         init_rwsem(&newdev->obd_observer_link_sem);
396         /* recovery data */
397         spin_lock_init(&newdev->obd_recovery_task_lock);
398         init_waitqueue_head(&newdev->obd_next_transno_waitq);
399         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
400         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
401         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
402         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
403         INIT_LIST_HEAD(&newdev->obd_evict_list);
404         INIT_LIST_HEAD(&newdev->obd_lwp_list);
405
406         llog_group_init(&newdev->obd_olg);
407         /* Detach drops this */
408         atomic_set(&newdev->obd_refcount, 1);
409         lu_ref_init(&newdev->obd_reference);
410         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
411
412         newdev->obd_conn_inprogress = 0;
413
414         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
415
416         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
417                newdev->obd_name, newdev);
418
419         return newdev;
420 }
421
422 /**
423  * Free obd device.
424  *
425  * \param[in] obd obd_device to be freed
426  *
427  * \retval none
428  */
429 void class_free_dev(struct obd_device *obd)
430 {
431         struct obd_type *obd_type = obd->obd_type;
432
433         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
434                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
435         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
436                  "obd %p != obd_devs[%d] %p\n",
437                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
438         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
439                  "obd_refcount should be 0, not %d\n",
440                  atomic_read(&obd->obd_refcount));
441         LASSERT(obd_type != NULL);
442
443         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
444                obd->obd_name, obd->obd_type->typ_name);
445
446         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
447                          obd->obd_name, obd->obd_uuid.uuid);
448         if (obd->obd_stopping) {
449                 int err;
450
451                 /* If we're not stopping, we were never set up */
452                 err = obd_cleanup(obd);
453                 if (err)
454                         CERROR("Cleanup %s returned %d\n",
455                                 obd->obd_name, err);
456         }
457
458         obd_device_free(obd);
459
460         class_put_type(obd_type);
461 }
462
463 /**
464  * Unregister obd device.
465  *
466  * Free slot in obd_dev[] used by \a obd.
467  *
468  * \param[in] new_obd obd_device to be unregistered
469  *
470  * \retval none
471  */
472 void class_unregister_device(struct obd_device *obd)
473 {
474         write_lock(&obd_dev_lock);
475         if (obd->obd_minor >= 0) {
476                 LASSERT(obd_devs[obd->obd_minor] == obd);
477                 obd_devs[obd->obd_minor] = NULL;
478                 obd->obd_minor = -1;
479         }
480         write_unlock(&obd_dev_lock);
481 }
482
483 /**
484  * Register obd device.
485  *
486  * Find free slot in obd_devs[], fills it with \a new_obd.
487  *
488  * \param[in] new_obd obd_device to be registered
489  *
490  * \retval 0          success
491  * \retval -EEXIST    device with this name is registered
492  * \retval -EOVERFLOW obd_devs[] is full
493  */
494 int class_register_device(struct obd_device *new_obd)
495 {
496         int ret = 0;
497         int i;
498         int new_obd_minor = 0;
499         bool minor_assign = false;
500         bool retried = false;
501
502 again:
503         write_lock(&obd_dev_lock);
504         for (i = 0; i < class_devno_max(); i++) {
505                 struct obd_device *obd = class_num2obd(i);
506
507                 if (obd != NULL &&
508                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
509
510                         if (!retried) {
511                                 write_unlock(&obd_dev_lock);
512
513                                 /* the obd_device could be waited to be
514                                  * destroyed by the "obd_zombie_impexp_thread".
515                                  */
516                                 obd_zombie_barrier();
517                                 retried = true;
518                                 goto again;
519                         }
520
521                         CERROR("%s: already exists, won't add\n",
522                                obd->obd_name);
523                         /* in case we found a free slot before duplicate */
524                         minor_assign = false;
525                         ret = -EEXIST;
526                         break;
527                 }
528                 if (!minor_assign && obd == NULL) {
529                         new_obd_minor = i;
530                         minor_assign = true;
531                 }
532         }
533
534         if (minor_assign) {
535                 new_obd->obd_minor = new_obd_minor;
536                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
537                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
538                 obd_devs[new_obd_minor] = new_obd;
539         } else {
540                 if (ret == 0) {
541                         ret = -EOVERFLOW;
542                         CERROR("%s: all %u/%u devices used, increase "
543                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
544                                i, class_devno_max(), ret);
545                 }
546         }
547         write_unlock(&obd_dev_lock);
548
549         RETURN(ret);
550 }
551
552 static int class_name2dev_nolock(const char *name)
553 {
554         int i;
555
556         if (!name)
557                 return -1;
558
559         for (i = 0; i < class_devno_max(); i++) {
560                 struct obd_device *obd = class_num2obd(i);
561
562                 if (obd && strcmp(name, obd->obd_name) == 0) {
563                         /* Make sure we finished attaching before we give
564                            out any references */
565                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
566                         if (obd->obd_attached) {
567                                 return i;
568                         }
569                         break;
570                 }
571         }
572
573         return -1;
574 }
575
576 int class_name2dev(const char *name)
577 {
578         int i;
579
580         if (!name)
581                 return -1;
582
583         read_lock(&obd_dev_lock);
584         i = class_name2dev_nolock(name);
585         read_unlock(&obd_dev_lock);
586
587         return i;
588 }
589 EXPORT_SYMBOL(class_name2dev);
590
591 struct obd_device *class_name2obd(const char *name)
592 {
593         int dev = class_name2dev(name);
594
595         if (dev < 0 || dev > class_devno_max())
596                 return NULL;
597         return class_num2obd(dev);
598 }
599 EXPORT_SYMBOL(class_name2obd);
600
601 int class_uuid2dev_nolock(struct obd_uuid *uuid)
602 {
603         int i;
604
605         for (i = 0; i < class_devno_max(); i++) {
606                 struct obd_device *obd = class_num2obd(i);
607
608                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
609                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
610                         return i;
611                 }
612         }
613
614         return -1;
615 }
616
617 int class_uuid2dev(struct obd_uuid *uuid)
618 {
619         int i;
620
621         read_lock(&obd_dev_lock);
622         i = class_uuid2dev_nolock(uuid);
623         read_unlock(&obd_dev_lock);
624
625         return i;
626 }
627 EXPORT_SYMBOL(class_uuid2dev);
628
629 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
630 {
631         int dev = class_uuid2dev(uuid);
632         if (dev < 0)
633                 return NULL;
634         return class_num2obd(dev);
635 }
636 EXPORT_SYMBOL(class_uuid2obd);
637
638 /**
639  * Get obd device from ::obd_devs[]
640  *
641  * \param num [in] array index
642  *
643  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
644  *         otherwise return the obd device there.
645  */
646 struct obd_device *class_num2obd(int num)
647 {
648         struct obd_device *obd = NULL;
649
650         if (num < class_devno_max()) {
651                 obd = obd_devs[num];
652                 if (obd == NULL)
653                         return NULL;
654
655                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
656                          "%p obd_magic %08x != %08x\n",
657                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
658                 LASSERTF(obd->obd_minor == num,
659                          "%p obd_minor %0d != %0d\n",
660                          obd, obd->obd_minor, num);
661         }
662
663         return obd;
664 }
665 EXPORT_SYMBOL(class_num2obd);
666
667 /**
668  * Find obd in obd_dev[] by name or uuid.
669  *
670  * Increment obd's refcount if found.
671  *
672  * \param[in] str obd name or uuid
673  *
674  * \retval NULL    if not found
675  * \retval target  pointer to found obd_device
676  */
677 struct obd_device *class_dev_by_str(const char *str)
678 {
679         struct obd_device *target = NULL;
680         struct obd_uuid tgtuuid;
681         int rc;
682
683         obd_str2uuid(&tgtuuid, str);
684
685         read_lock(&obd_dev_lock);
686         rc = class_uuid2dev_nolock(&tgtuuid);
687         if (rc < 0)
688                 rc = class_name2dev_nolock(str);
689
690         if (rc >= 0)
691                 target = class_num2obd(rc);
692
693         if (target != NULL)
694                 class_incref(target, "find", current);
695         read_unlock(&obd_dev_lock);
696
697         RETURN(target);
698 }
699 EXPORT_SYMBOL(class_dev_by_str);
700
701 /**
702  * Get obd devices count. Device in any
703  *    state are counted
704  * \retval obd device count
705  */
706 int get_devices_count(void)
707 {
708         int index, max_index = class_devno_max(), dev_count = 0;
709
710         read_lock(&obd_dev_lock);
711         for (index = 0; index <= max_index; index++) {
712                 struct obd_device *obd = class_num2obd(index);
713                 if (obd != NULL)
714                         dev_count++;
715         }
716         read_unlock(&obd_dev_lock);
717
718         return dev_count;
719 }
720 EXPORT_SYMBOL(get_devices_count);
721
722 void class_obd_list(void)
723 {
724         char *status;
725         int i;
726
727         read_lock(&obd_dev_lock);
728         for (i = 0; i < class_devno_max(); i++) {
729                 struct obd_device *obd = class_num2obd(i);
730
731                 if (obd == NULL)
732                         continue;
733                 if (obd->obd_stopping)
734                         status = "ST";
735                 else if (obd->obd_set_up)
736                         status = "UP";
737                 else if (obd->obd_attached)
738                         status = "AT";
739                 else
740                         status = "--";
741                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
742                          i, status, obd->obd_type->typ_name,
743                          obd->obd_name, obd->obd_uuid.uuid,
744                          atomic_read(&obd->obd_refcount));
745         }
746         read_unlock(&obd_dev_lock);
747 }
748
749 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
750  * specified, then only the client with that uuid is returned,
751  * otherwise any client connected to the tgt is returned.
752  */
753 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
754                                          const char *type_name,
755                                          struct obd_uuid *grp_uuid)
756 {
757         int i;
758
759         read_lock(&obd_dev_lock);
760         for (i = 0; i < class_devno_max(); i++) {
761                 struct obd_device *obd = class_num2obd(i);
762
763                 if (obd == NULL)
764                         continue;
765                 if ((strncmp(obd->obd_type->typ_name, type_name,
766                              strlen(type_name)) == 0)) {
767                         if (obd_uuid_equals(tgt_uuid,
768                                             &obd->u.cli.cl_target_uuid) &&
769                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
770                                                          &obd->obd_uuid) : 1)) {
771                                 read_unlock(&obd_dev_lock);
772                                 return obd;
773                         }
774                 }
775         }
776         read_unlock(&obd_dev_lock);
777
778         return NULL;
779 }
780 EXPORT_SYMBOL(class_find_client_obd);
781
782 /* Iterate the obd_device list looking devices have grp_uuid. Start
783  * searching at *next, and if a device is found, the next index to look
784  * at is saved in *next. If next is NULL, then the first matching device
785  * will always be returned.
786  */
787 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
788 {
789         int i;
790
791         if (next == NULL)
792                 i = 0;
793         else if (*next >= 0 && *next < class_devno_max())
794                 i = *next;
795         else
796                 return NULL;
797
798         read_lock(&obd_dev_lock);
799         for (; i < class_devno_max(); i++) {
800                 struct obd_device *obd = class_num2obd(i);
801
802                 if (obd == NULL)
803                         continue;
804                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
805                         if (next != NULL)
806                                 *next = i+1;
807                         read_unlock(&obd_dev_lock);
808                         return obd;
809                 }
810         }
811         read_unlock(&obd_dev_lock);
812
813         return NULL;
814 }
815 EXPORT_SYMBOL(class_devices_in_group);
816
817 /**
818  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
819  * adjust sptlrpc settings accordingly.
820  */
821 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
822 {
823         struct obd_device  *obd;
824         const char         *type;
825         int                 i, rc = 0, rc2;
826
827         LASSERT(namelen > 0);
828
829         read_lock(&obd_dev_lock);
830         for (i = 0; i < class_devno_max(); i++) {
831                 obd = class_num2obd(i);
832
833                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
834                         continue;
835
836                 /* only notify mdc, osc, osp, lwp, mdt, ost
837                  * because only these have a -sptlrpc llog */
838                 type = obd->obd_type->typ_name;
839                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
840                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
841                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
842                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
843                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
844                     strcmp(type, LUSTRE_OST_NAME) != 0)
845                         continue;
846
847                 if (strncmp(obd->obd_name, fsname, namelen))
848                         continue;
849
850                 class_incref(obd, __FUNCTION__, obd);
851                 read_unlock(&obd_dev_lock);
852                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
853                                          sizeof(KEY_SPTLRPC_CONF),
854                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
855                 rc = rc ? rc : rc2;
856                 class_decref(obd, __FUNCTION__, obd);
857                 read_lock(&obd_dev_lock);
858         }
859         read_unlock(&obd_dev_lock);
860         return rc;
861 }
862 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
863
864 void obd_cleanup_caches(void)
865 {
866         ENTRY;
867         if (obd_device_cachep) {
868                 kmem_cache_destroy(obd_device_cachep);
869                 obd_device_cachep = NULL;
870         }
871
872         EXIT;
873 }
874
875 int obd_init_caches(void)
876 {
877         int rc;
878         ENTRY;
879
880         LASSERT(obd_device_cachep == NULL);
881         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
882                                 sizeof(struct obd_device),
883                                 0, 0, 0, sizeof(struct obd_device), NULL);
884         if (!obd_device_cachep)
885                 GOTO(out, rc = -ENOMEM);
886
887         RETURN(0);
888 out:
889         obd_cleanup_caches();
890         RETURN(rc);
891 }
892
893 static const char export_handle_owner[] = "export";
894
895 /* map connection to client */
896 struct obd_export *class_conn2export(struct lustre_handle *conn)
897 {
898         struct obd_export *export;
899         ENTRY;
900
901         if (!conn) {
902                 CDEBUG(D_CACHE, "looking for null handle\n");
903                 RETURN(NULL);
904         }
905
906         if (conn->cookie == -1) {  /* this means assign a new connection */
907                 CDEBUG(D_CACHE, "want a new connection\n");
908                 RETURN(NULL);
909         }
910
911         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
912         export = class_handle2object(conn->cookie, export_handle_owner);
913         RETURN(export);
914 }
915 EXPORT_SYMBOL(class_conn2export);
916
917 struct obd_device *class_exp2obd(struct obd_export *exp)
918 {
919         if (exp)
920                 return exp->exp_obd;
921         return NULL;
922 }
923 EXPORT_SYMBOL(class_exp2obd);
924
925 struct obd_import *class_exp2cliimp(struct obd_export *exp)
926 {
927         struct obd_device *obd = exp->exp_obd;
928         if (obd == NULL)
929                 return NULL;
930         return obd->u.cli.cl_import;
931 }
932 EXPORT_SYMBOL(class_exp2cliimp);
933
934 /* Export management functions */
935 static void class_export_destroy(struct obd_export *exp)
936 {
937         struct obd_device *obd = exp->exp_obd;
938         ENTRY;
939
940         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
941         LASSERT(obd != NULL);
942
943         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
944                exp->exp_client_uuid.uuid, obd->obd_name);
945
946         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
947         ptlrpc_connection_put(exp->exp_connection);
948
949         LASSERT(list_empty(&exp->exp_outstanding_replies));
950         LASSERT(list_empty(&exp->exp_uncommitted_replies));
951         LASSERT(list_empty(&exp->exp_req_replay_queue));
952         LASSERT(list_empty(&exp->exp_hp_rpcs));
953         obd_destroy_export(exp);
954         /* self export doesn't hold a reference to an obd, although it
955          * exists until freeing of the obd */
956         if (exp != obd->obd_self_export)
957                 class_decref(obd, "export", exp);
958
959         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
960         kfree_rcu(exp, exp_handle.h_rcu);
961         EXIT;
962 }
963
964 struct obd_export *class_export_get(struct obd_export *exp)
965 {
966         refcount_inc(&exp->exp_handle.h_ref);
967         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
968                refcount_read(&exp->exp_handle.h_ref));
969         return exp;
970 }
971 EXPORT_SYMBOL(class_export_get);
972
973 void class_export_put(struct obd_export *exp)
974 {
975         LASSERT(exp != NULL);
976         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
977         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
978         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
979                refcount_read(&exp->exp_handle.h_ref) - 1);
980
981         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
982                 struct obd_device *obd = exp->exp_obd;
983
984                 CDEBUG(D_IOCTL, "final put %p/%s\n",
985                        exp, exp->exp_client_uuid.uuid);
986
987                 /* release nid stat refererence */
988                 lprocfs_exp_cleanup(exp);
989
990                 if (exp == obd->obd_self_export) {
991                         /* self export should be destroyed without
992                          * zombie thread as it doesn't hold a
993                          * reference to obd and doesn't hold any
994                          * resources */
995                         class_export_destroy(exp);
996                         /* self export is destroyed, no class
997                          * references exist and it is safe to free
998                          * obd */
999                         class_free_dev(obd);
1000                 } else {
1001                         LASSERT(!list_empty(&exp->exp_obd_chain));
1002                         obd_zombie_export_add(exp);
1003                 }
1004
1005         }
1006 }
1007 EXPORT_SYMBOL(class_export_put);
1008
1009 static void obd_zombie_exp_cull(struct work_struct *ws)
1010 {
1011         struct obd_export *export;
1012
1013         export = container_of(ws, struct obd_export, exp_zombie_work);
1014         class_export_destroy(export);
1015 }
1016
1017 /* Creates a new export, adds it to the hash table, and returns a
1018  * pointer to it. The refcount is 2: one for the hash reference, and
1019  * one for the pointer returned by this function. */
1020 struct obd_export *__class_new_export(struct obd_device *obd,
1021                                       struct obd_uuid *cluuid, bool is_self)
1022 {
1023         struct obd_export *export;
1024         int rc = 0;
1025         ENTRY;
1026
1027         OBD_ALLOC_PTR(export);
1028         if (!export)
1029                 return ERR_PTR(-ENOMEM);
1030
1031         export->exp_conn_cnt = 0;
1032         export->exp_lock_hash = NULL;
1033         export->exp_flock_hash = NULL;
1034         /* 2 = class_handle_hash + last */
1035         refcount_set(&export->exp_handle.h_ref, 2);
1036         atomic_set(&export->exp_rpc_count, 0);
1037         atomic_set(&export->exp_cb_count, 0);
1038         atomic_set(&export->exp_locks_count, 0);
1039 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1040         INIT_LIST_HEAD(&export->exp_locks_list);
1041         spin_lock_init(&export->exp_locks_list_guard);
1042 #endif
1043         atomic_set(&export->exp_replay_count, 0);
1044         export->exp_obd = obd;
1045         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1046         spin_lock_init(&export->exp_uncommitted_replies_lock);
1047         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1048         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1049         INIT_HLIST_NODE(&export->exp_handle.h_link);
1050         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1051         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1052         class_handle_hash(&export->exp_handle, export_handle_owner);
1053         export->exp_last_request_time = ktime_get_real_seconds();
1054         spin_lock_init(&export->exp_lock);
1055         spin_lock_init(&export->exp_rpc_lock);
1056         INIT_HLIST_NODE(&export->exp_gen_hash);
1057         spin_lock_init(&export->exp_bl_list_lock);
1058         INIT_LIST_HEAD(&export->exp_bl_list);
1059         INIT_LIST_HEAD(&export->exp_stale_list);
1060         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1061
1062         export->exp_sp_peer = LUSTRE_SP_ANY;
1063         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1064         export->exp_client_uuid = *cluuid;
1065         obd_init_export(export);
1066
1067         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1068
1069         spin_lock(&obd->obd_dev_lock);
1070         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1071                 /* shouldn't happen, but might race */
1072                 if (obd->obd_stopping)
1073                         GOTO(exit_unlock, rc = -ENODEV);
1074
1075                 rc = obd_uuid_add(obd, export);
1076                 if (rc != 0) {
1077                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1078                                       obd->obd_name, cluuid->uuid, rc);
1079                         GOTO(exit_unlock, rc = -EALREADY);
1080                 }
1081         }
1082
1083         if (!is_self) {
1084                 class_incref(obd, "export", export);
1085                 list_add_tail(&export->exp_obd_chain_timed,
1086                               &obd->obd_exports_timed);
1087                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1088                 obd->obd_num_exports++;
1089         } else {
1090                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1091                 INIT_LIST_HEAD(&export->exp_obd_chain);
1092         }
1093         spin_unlock(&obd->obd_dev_lock);
1094         RETURN(export);
1095
1096 exit_unlock:
1097         spin_unlock(&obd->obd_dev_lock);
1098         class_handle_unhash(&export->exp_handle);
1099         obd_destroy_export(export);
1100         OBD_FREE_PTR(export);
1101         return ERR_PTR(rc);
1102 }
1103
1104 struct obd_export *class_new_export(struct obd_device *obd,
1105                                     struct obd_uuid *uuid)
1106 {
1107         return __class_new_export(obd, uuid, false);
1108 }
1109 EXPORT_SYMBOL(class_new_export);
1110
1111 struct obd_export *class_new_export_self(struct obd_device *obd,
1112                                          struct obd_uuid *uuid)
1113 {
1114         return __class_new_export(obd, uuid, true);
1115 }
1116
1117 void class_unlink_export(struct obd_export *exp)
1118 {
1119         class_handle_unhash(&exp->exp_handle);
1120
1121         if (exp->exp_obd->obd_self_export == exp) {
1122                 class_export_put(exp);
1123                 return;
1124         }
1125
1126         spin_lock(&exp->exp_obd->obd_dev_lock);
1127         /* delete an uuid-export hashitem from hashtables */
1128         if (exp != exp->exp_obd->obd_self_export)
1129                 obd_uuid_del(exp->exp_obd, exp);
1130
1131 #ifdef HAVE_SERVER_SUPPORT
1132         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1133                 struct tg_export_data   *ted = &exp->exp_target_data;
1134                 struct cfs_hash         *hash;
1135
1136                 /* Because obd_gen_hash will not be released until
1137                  * class_cleanup(), so hash should never be NULL here */
1138                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1139                 LASSERT(hash != NULL);
1140                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1141                              &exp->exp_gen_hash);
1142                 cfs_hash_putref(hash);
1143         }
1144 #endif /* HAVE_SERVER_SUPPORT */
1145
1146         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1147         list_del_init(&exp->exp_obd_chain_timed);
1148         exp->exp_obd->obd_num_exports--;
1149         spin_unlock(&exp->exp_obd->obd_dev_lock);
1150         atomic_inc(&obd_stale_export_num);
1151
1152         /* A reference is kept by obd_stale_exports list */
1153         obd_stale_export_put(exp);
1154 }
1155 EXPORT_SYMBOL(class_unlink_export);
1156
1157 /* Import management functions */
1158 static void obd_zombie_import_free(struct obd_import *imp)
1159 {
1160         struct obd_import_conn *imp_conn;
1161
1162         ENTRY;
1163         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1164                imp->imp_obd->obd_name);
1165
1166         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1167
1168         ptlrpc_connection_put(imp->imp_connection);
1169
1170         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1171                                                     struct obd_import_conn,
1172                                                     oic_item)) != NULL) {
1173                 list_del_init(&imp_conn->oic_item);
1174                 ptlrpc_connection_put(imp_conn->oic_conn);
1175                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1176         }
1177
1178         LASSERT(imp->imp_sec == NULL);
1179         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1180                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1181         class_decref(imp->imp_obd, "import", imp);
1182         OBD_FREE_PTR(imp);
1183         EXIT;
1184 }
1185
1186 struct obd_import *class_import_get(struct obd_import *import)
1187 {
1188         refcount_inc(&import->imp_refcount);
1189         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1190                refcount_read(&import->imp_refcount),
1191                import->imp_obd->obd_name);
1192         return import;
1193 }
1194 EXPORT_SYMBOL(class_import_get);
1195
1196 void class_import_put(struct obd_import *imp)
1197 {
1198         ENTRY;
1199
1200         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1201
1202         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1203                refcount_read(&imp->imp_refcount) - 1,
1204                imp->imp_obd->obd_name);
1205
1206         if (refcount_dec_and_test(&imp->imp_refcount)) {
1207                 CDEBUG(D_INFO, "final put import %p\n", imp);
1208                 obd_zombie_import_add(imp);
1209         }
1210
1211         EXIT;
1212 }
1213 EXPORT_SYMBOL(class_import_put);
1214
1215 static void init_imp_at(struct imp_at *at) {
1216         int i;
1217         at_init(&at->iat_net_latency, 0, 0);
1218         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1219                 /* max service estimates are tracked on the server side, so
1220                    don't use the AT history here, just use the last reported
1221                    val. (But keep hist for proc histogram, worst_ever) */
1222                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1223                         AT_FLG_NOHIST);
1224         }
1225 }
1226
1227 static void obd_zombie_imp_cull(struct work_struct *ws)
1228 {
1229         struct obd_import *import;
1230
1231         import = container_of(ws, struct obd_import, imp_zombie_work);
1232         obd_zombie_import_free(import);
1233 }
1234
1235 struct obd_import *class_new_import(struct obd_device *obd)
1236 {
1237         struct obd_import *imp;
1238         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1239
1240         OBD_ALLOC(imp, sizeof(*imp));
1241         if (imp == NULL)
1242                 return NULL;
1243
1244         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1245         INIT_LIST_HEAD(&imp->imp_replay_list);
1246         INIT_LIST_HEAD(&imp->imp_sending_list);
1247         INIT_LIST_HEAD(&imp->imp_delayed_list);
1248         INIT_LIST_HEAD(&imp->imp_committed_list);
1249         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1250         imp->imp_known_replied_xid = 0;
1251         imp->imp_replay_cursor = &imp->imp_committed_list;
1252         spin_lock_init(&imp->imp_lock);
1253         imp->imp_last_success_conn = 0;
1254         imp->imp_state = LUSTRE_IMP_NEW;
1255         imp->imp_obd = class_incref(obd, "import", imp);
1256         rwlock_init(&imp->imp_sec_lock);
1257         init_waitqueue_head(&imp->imp_recovery_waitq);
1258         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1259
1260         if (curr_pid_ns && curr_pid_ns->child_reaper)
1261                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1262         else
1263                 imp->imp_sec_refpid = 1;
1264
1265         refcount_set(&imp->imp_refcount, 2);
1266         atomic_set(&imp->imp_unregistering, 0);
1267         atomic_set(&imp->imp_reqs, 0);
1268         atomic_set(&imp->imp_inflight, 0);
1269         atomic_set(&imp->imp_replay_inflight, 0);
1270         init_waitqueue_head(&imp->imp_replay_waitq);
1271         atomic_set(&imp->imp_inval_count, 0);
1272         INIT_LIST_HEAD(&imp->imp_conn_list);
1273         init_imp_at(&imp->imp_at);
1274
1275         /* the default magic is V2, will be used in connect RPC, and
1276          * then adjusted according to the flags in request/reply. */
1277         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1278
1279         return imp;
1280 }
1281 EXPORT_SYMBOL(class_new_import);
1282
1283 void class_destroy_import(struct obd_import *import)
1284 {
1285         LASSERT(import != NULL);
1286         LASSERT(import != LP_POISON);
1287
1288         spin_lock(&import->imp_lock);
1289         import->imp_generation++;
1290         spin_unlock(&import->imp_lock);
1291         class_import_put(import);
1292 }
1293 EXPORT_SYMBOL(class_destroy_import);
1294
1295 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1296
1297 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1298 {
1299         spin_lock(&exp->exp_locks_list_guard);
1300
1301         LASSERT(lock->l_exp_refs_nr >= 0);
1302
1303         if (lock->l_exp_refs_target != NULL &&
1304             lock->l_exp_refs_target != exp) {
1305                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1306                               exp, lock, lock->l_exp_refs_target);
1307         }
1308         if ((lock->l_exp_refs_nr ++) == 0) {
1309                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1310                 lock->l_exp_refs_target = exp;
1311         }
1312         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1313                lock, exp, lock->l_exp_refs_nr);
1314         spin_unlock(&exp->exp_locks_list_guard);
1315 }
1316 EXPORT_SYMBOL(__class_export_add_lock_ref);
1317
1318 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1319 {
1320         spin_lock(&exp->exp_locks_list_guard);
1321         LASSERT(lock->l_exp_refs_nr > 0);
1322         if (lock->l_exp_refs_target != exp) {
1323                 LCONSOLE_WARN("lock %p, "
1324                               "mismatching export pointers: %p, %p\n",
1325                               lock, lock->l_exp_refs_target, exp);
1326         }
1327         if (-- lock->l_exp_refs_nr == 0) {
1328                 list_del_init(&lock->l_exp_refs_link);
1329                 lock->l_exp_refs_target = NULL;
1330         }
1331         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1332                lock, exp, lock->l_exp_refs_nr);
1333         spin_unlock(&exp->exp_locks_list_guard);
1334 }
1335 EXPORT_SYMBOL(__class_export_del_lock_ref);
1336 #endif
1337
1338 /* A connection defines an export context in which preallocation can
1339    be managed. This releases the export pointer reference, and returns
1340    the export handle, so the export refcount is 1 when this function
1341    returns. */
1342 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1343                   struct obd_uuid *cluuid)
1344 {
1345         struct obd_export *export;
1346         LASSERT(conn != NULL);
1347         LASSERT(obd != NULL);
1348         LASSERT(cluuid != NULL);
1349         ENTRY;
1350
1351         export = class_new_export(obd, cluuid);
1352         if (IS_ERR(export))
1353                 RETURN(PTR_ERR(export));
1354
1355         conn->cookie = export->exp_handle.h_cookie;
1356         class_export_put(export);
1357
1358         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1359                cluuid->uuid, conn->cookie);
1360         RETURN(0);
1361 }
1362 EXPORT_SYMBOL(class_connect);
1363
1364 /* if export is involved in recovery then clean up related things */
1365 static void class_export_recovery_cleanup(struct obd_export *exp)
1366 {
1367         struct obd_device *obd = exp->exp_obd;
1368
1369         spin_lock(&obd->obd_recovery_task_lock);
1370         if (obd->obd_recovering) {
1371                 if (exp->exp_in_recovery) {
1372                         spin_lock(&exp->exp_lock);
1373                         exp->exp_in_recovery = 0;
1374                         spin_unlock(&exp->exp_lock);
1375                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1376                         atomic_dec(&obd->obd_connected_clients);
1377                 }
1378
1379                 /* if called during recovery then should update
1380                  * obd_stale_clients counter,
1381                  * lightweight exports are not counted */
1382                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1383                         exp->exp_obd->obd_stale_clients++;
1384         }
1385         spin_unlock(&obd->obd_recovery_task_lock);
1386
1387         spin_lock(&exp->exp_lock);
1388         /** Cleanup req replay fields */
1389         if (exp->exp_req_replay_needed) {
1390                 exp->exp_req_replay_needed = 0;
1391
1392                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1393                 atomic_dec(&obd->obd_req_replay_clients);
1394         }
1395
1396         /** Cleanup lock replay data */
1397         if (exp->exp_lock_replay_needed) {
1398                 exp->exp_lock_replay_needed = 0;
1399
1400                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1401                 atomic_dec(&obd->obd_lock_replay_clients);
1402         }
1403         spin_unlock(&exp->exp_lock);
1404 }
1405
1406 /* This function removes 1-3 references from the export:
1407  * 1 - for export pointer passed
1408  * and if disconnect really need
1409  * 2 - removing from hash
1410  * 3 - in client_unlink_export
1411  * The export pointer passed to this function can destroyed */
1412 int class_disconnect(struct obd_export *export)
1413 {
1414         int already_disconnected;
1415         ENTRY;
1416
1417         if (export == NULL) {
1418                 CWARN("attempting to free NULL export %p\n", export);
1419                 RETURN(-EINVAL);
1420         }
1421
1422         spin_lock(&export->exp_lock);
1423         already_disconnected = export->exp_disconnected;
1424         export->exp_disconnected = 1;
1425 #ifdef HAVE_SERVER_SUPPORT
1426         /*  We hold references of export for uuid hash
1427          *  and nid_hash and export link at least. So
1428          *  it is safe to call rh*table_remove_fast in
1429          *  there.
1430          */
1431         obd_nid_del(export->exp_obd, export);
1432 #endif /* HAVE_SERVER_SUPPORT */
1433         spin_unlock(&export->exp_lock);
1434
1435         /* class_cleanup(), abort_recovery(), and class_fail_export()
1436          * all end up in here, and if any of them race we shouldn't
1437          * call extra class_export_puts(). */
1438         if (already_disconnected)
1439                 GOTO(no_disconn, already_disconnected);
1440
1441         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1442                export->exp_handle.h_cookie);
1443
1444         class_export_recovery_cleanup(export);
1445         class_unlink_export(export);
1446 no_disconn:
1447         class_export_put(export);
1448         RETURN(0);
1449 }
1450 EXPORT_SYMBOL(class_disconnect);
1451
1452 /* Return non-zero for a fully connected export */
1453 int class_connected_export(struct obd_export *exp)
1454 {
1455         int connected = 0;
1456
1457         if (exp) {
1458                 spin_lock(&exp->exp_lock);
1459                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1460                 spin_unlock(&exp->exp_lock);
1461         }
1462         return connected;
1463 }
1464 EXPORT_SYMBOL(class_connected_export);
1465
1466 static void class_disconnect_export_list(struct list_head *list,
1467                                          enum obd_option flags)
1468 {
1469         int rc;
1470         struct obd_export *exp;
1471         ENTRY;
1472
1473         /* It's possible that an export may disconnect itself, but
1474          * nothing else will be added to this list.
1475          */
1476         while ((exp = list_first_entry_or_null(list, struct obd_export,
1477                                                exp_obd_chain)) != NULL) {
1478                 /* need for safe call CDEBUG after obd_disconnect */
1479                 class_export_get(exp);
1480
1481                 spin_lock(&exp->exp_lock);
1482                 exp->exp_flags = flags;
1483                 spin_unlock(&exp->exp_lock);
1484
1485                 if (obd_uuid_equals(&exp->exp_client_uuid,
1486                                     &exp->exp_obd->obd_uuid)) {
1487                         CDEBUG(D_HA,
1488                                "exp %p export uuid == obd uuid, don't discon\n",
1489                                exp);
1490                         /* Need to delete this now so we don't end up pointing
1491                          * to work_list later when this export is cleaned up. */
1492                         list_del_init(&exp->exp_obd_chain);
1493                         class_export_put(exp);
1494                         continue;
1495                 }
1496
1497                 class_export_get(exp);
1498                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1499                        "last request at %lld\n",
1500                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1501                        exp, exp->exp_last_request_time);
1502                 /* release one export reference anyway */
1503                 rc = obd_disconnect(exp);
1504
1505                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1506                        obd_export_nid2str(exp), exp, rc);
1507                 class_export_put(exp);
1508         }
1509         EXIT;
1510 }
1511
1512 void class_disconnect_exports(struct obd_device *obd)
1513 {
1514         LIST_HEAD(work_list);
1515         ENTRY;
1516
1517         /* Move all of the exports from obd_exports to a work list, en masse. */
1518         spin_lock(&obd->obd_dev_lock);
1519         list_splice_init(&obd->obd_exports, &work_list);
1520         list_splice_init(&obd->obd_delayed_exports, &work_list);
1521         spin_unlock(&obd->obd_dev_lock);
1522
1523         if (!list_empty(&work_list)) {
1524                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1525                        "disconnecting them\n", obd->obd_minor, obd);
1526                 class_disconnect_export_list(&work_list,
1527                                              exp_flags_from_obd(obd));
1528         } else
1529                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1530                        obd->obd_minor, obd);
1531         EXIT;
1532 }
1533 EXPORT_SYMBOL(class_disconnect_exports);
1534
1535 /* Remove exports that have not completed recovery.
1536  */
1537 void class_disconnect_stale_exports(struct obd_device *obd,
1538                                     int (*test_export)(struct obd_export *))
1539 {
1540         LIST_HEAD(work_list);
1541         struct obd_export *exp, *n;
1542         int evicted = 0;
1543         ENTRY;
1544
1545         spin_lock(&obd->obd_dev_lock);
1546         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1547                                  exp_obd_chain) {
1548                 /* don't count self-export as client */
1549                 if (obd_uuid_equals(&exp->exp_client_uuid,
1550                                     &exp->exp_obd->obd_uuid))
1551                         continue;
1552
1553                 /* don't evict clients which have no slot in last_rcvd
1554                  * (e.g. lightweight connection) */
1555                 if (exp->exp_target_data.ted_lr_idx == -1)
1556                         continue;
1557
1558                 spin_lock(&exp->exp_lock);
1559                 if (exp->exp_failed || test_export(exp)) {
1560                         spin_unlock(&exp->exp_lock);
1561                         continue;
1562                 }
1563                 exp->exp_failed = 1;
1564                 spin_unlock(&exp->exp_lock);
1565
1566                 list_move(&exp->exp_obd_chain, &work_list);
1567                 evicted++;
1568                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1569                        obd->obd_name, exp->exp_client_uuid.uuid,
1570                        obd_export_nid2str(exp));
1571                 print_export_data(exp, "EVICTING", 0, D_HA);
1572         }
1573         spin_unlock(&obd->obd_dev_lock);
1574
1575         if (evicted)
1576                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1577                               obd->obd_name, evicted);
1578
1579         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1580                                                  OBD_OPT_ABORT_RECOV);
1581         EXIT;
1582 }
1583 EXPORT_SYMBOL(class_disconnect_stale_exports);
1584
1585 void class_fail_export(struct obd_export *exp)
1586 {
1587         int rc, already_failed;
1588
1589         spin_lock(&exp->exp_lock);
1590         already_failed = exp->exp_failed;
1591         exp->exp_failed = 1;
1592         spin_unlock(&exp->exp_lock);
1593
1594         if (already_failed) {
1595                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1596                        exp, exp->exp_client_uuid.uuid);
1597                 return;
1598         }
1599
1600         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1601                exp, exp->exp_client_uuid.uuid);
1602
1603         if (obd_dump_on_timeout)
1604                 libcfs_debug_dumplog();
1605
1606         /* need for safe call CDEBUG after obd_disconnect */
1607         class_export_get(exp);
1608
1609         /* Most callers into obd_disconnect are removing their own reference
1610          * (request, for example) in addition to the one from the hash table.
1611          * We don't have such a reference here, so make one. */
1612         class_export_get(exp);
1613         rc = obd_disconnect(exp);
1614         if (rc)
1615                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1616         else
1617                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1618                        exp, exp->exp_client_uuid.uuid);
1619         class_export_put(exp);
1620 }
1621 EXPORT_SYMBOL(class_fail_export);
1622
1623 #ifdef HAVE_SERVER_SUPPORT
1624
1625 static int take_first(struct obd_export *exp, void *data)
1626 {
1627         struct obd_export **expp = data;
1628
1629         if (*expp)
1630                 /* already have one */
1631                 return 0;
1632         if (exp->exp_failed)
1633                 /* Don't want this one */
1634                 return 0;
1635         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1636                 /* Cannot get a ref on this one */
1637                 return 0;
1638         *expp = exp;
1639         return 1;
1640 }
1641
1642 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1643 {
1644         struct lnet_nid nid_key;
1645         struct obd_export *doomed_exp;
1646         int exports_evicted = 0;
1647
1648         libcfs_strnid(&nid_key, nid);
1649
1650         spin_lock(&obd->obd_dev_lock);
1651         /* umount has run already, so evict thread should leave
1652          * its task to umount thread now */
1653         if (obd->obd_stopping) {
1654                 spin_unlock(&obd->obd_dev_lock);
1655                 return exports_evicted;
1656         }
1657         spin_unlock(&obd->obd_dev_lock);
1658
1659         doomed_exp = NULL;
1660         while (obd_nid_export_for_each(obd, &nid_key,
1661                                        take_first, &doomed_exp) > 0) {
1662
1663                 LASSERTF(doomed_exp != obd->obd_self_export,
1664                          "self-export is hashed by NID?\n");
1665
1666                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1667                               obd->obd_name,
1668                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1669                               obd_export_nid2str(doomed_exp));
1670
1671                 class_fail_export(doomed_exp);
1672                 class_export_put(doomed_exp);
1673                 exports_evicted++;
1674                 doomed_exp = NULL;
1675         }
1676
1677         if (!exports_evicted)
1678                 CDEBUG(D_HA,
1679                        "%s: can't disconnect NID '%s': no exports found\n",
1680                        obd->obd_name, nid);
1681         return exports_evicted;
1682 }
1683 EXPORT_SYMBOL(obd_export_evict_by_nid);
1684
1685 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1686 {
1687         struct obd_export *doomed_exp = NULL;
1688         struct obd_uuid doomed_uuid;
1689         int exports_evicted = 0;
1690
1691         spin_lock(&obd->obd_dev_lock);
1692         if (obd->obd_stopping) {
1693                 spin_unlock(&obd->obd_dev_lock);
1694                 return exports_evicted;
1695         }
1696         spin_unlock(&obd->obd_dev_lock);
1697
1698         obd_str2uuid(&doomed_uuid, uuid);
1699         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1700                 CERROR("%s: can't evict myself\n", obd->obd_name);
1701                 return exports_evicted;
1702         }
1703
1704         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1705         if (doomed_exp == NULL) {
1706                 CERROR("%s: can't disconnect %s: no exports found\n",
1707                        obd->obd_name, uuid);
1708         } else {
1709                 CWARN("%s: evicting %s at adminstrative request\n",
1710                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1711                 class_fail_export(doomed_exp);
1712                 class_export_put(doomed_exp);
1713                 obd_uuid_del(obd, doomed_exp);
1714                 exports_evicted++;
1715         }
1716
1717         return exports_evicted;
1718 }
1719 #endif /* HAVE_SERVER_SUPPORT */
1720
1721 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1722 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1723 EXPORT_SYMBOL(class_export_dump_hook);
1724 #endif
1725
1726 static void print_export_data(struct obd_export *exp, const char *status,
1727                               int locks, int debug_level)
1728 {
1729         struct ptlrpc_reply_state *rs;
1730         struct ptlrpc_reply_state *first_reply = NULL;
1731         int nreplies = 0;
1732
1733         spin_lock(&exp->exp_lock);
1734         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1735                             rs_exp_list) {
1736                 if (nreplies == 0)
1737                         first_reply = rs;
1738                 nreplies++;
1739         }
1740         spin_unlock(&exp->exp_lock);
1741
1742         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1743                "%p %s %llu stale:%d\n",
1744                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1745                obd_export_nid2str(exp),
1746                refcount_read(&exp->exp_handle.h_ref),
1747                atomic_read(&exp->exp_rpc_count),
1748                atomic_read(&exp->exp_cb_count),
1749                atomic_read(&exp->exp_locks_count),
1750                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1751                nreplies, first_reply, nreplies > 3 ? "..." : "",
1752                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1753 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1754         if (locks && class_export_dump_hook != NULL)
1755                 class_export_dump_hook(exp);
1756 #endif
1757 }
1758
1759 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1760 {
1761         struct obd_export *exp;
1762
1763         spin_lock(&obd->obd_dev_lock);
1764         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1765                 print_export_data(exp, "ACTIVE", locks, debug_level);
1766         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1767                 print_export_data(exp, "UNLINKED", locks, debug_level);
1768         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1769                 print_export_data(exp, "DELAYED", locks, debug_level);
1770         spin_unlock(&obd->obd_dev_lock);
1771 }
1772
1773 void obd_exports_barrier(struct obd_device *obd)
1774 {
1775         int waited = 2;
1776         LASSERT(list_empty(&obd->obd_exports));
1777         spin_lock(&obd->obd_dev_lock);
1778         while (!list_empty(&obd->obd_unlinked_exports)) {
1779                 spin_unlock(&obd->obd_dev_lock);
1780                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1781                 if (waited > 5 && is_power_of_2(waited)) {
1782                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1783                                       "more than %d seconds. "
1784                                       "The obd refcount = %d. Is it stuck?\n",
1785                                       obd->obd_name, waited,
1786                                       atomic_read(&obd->obd_refcount));
1787                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1788                 }
1789                 waited *= 2;
1790                 spin_lock(&obd->obd_dev_lock);
1791         }
1792         spin_unlock(&obd->obd_dev_lock);
1793 }
1794 EXPORT_SYMBOL(obd_exports_barrier);
1795
1796 /**
1797  * Add export to the obd_zombe thread and notify it.
1798  */
1799 static void obd_zombie_export_add(struct obd_export *exp) {
1800         atomic_dec(&obd_stale_export_num);
1801         spin_lock(&exp->exp_obd->obd_dev_lock);
1802         LASSERT(!list_empty(&exp->exp_obd_chain));
1803         list_del_init(&exp->exp_obd_chain);
1804         spin_unlock(&exp->exp_obd->obd_dev_lock);
1805
1806         queue_work(zombie_wq, &exp->exp_zombie_work);
1807 }
1808
1809 /**
1810  * Add import to the obd_zombe thread and notify it.
1811  */
1812 static void obd_zombie_import_add(struct obd_import *imp) {
1813         LASSERT(imp->imp_sec == NULL);
1814
1815         queue_work(zombie_wq, &imp->imp_zombie_work);
1816 }
1817
1818 /**
1819  * wait when obd_zombie import/export queues become empty
1820  */
1821 void obd_zombie_barrier(void)
1822 {
1823         flush_workqueue(zombie_wq);
1824 }
1825 EXPORT_SYMBOL(obd_zombie_barrier);
1826
1827
1828 struct obd_export *obd_stale_export_get(void)
1829 {
1830         struct obd_export *exp = NULL;
1831         ENTRY;
1832
1833         spin_lock(&obd_stale_export_lock);
1834         if (!list_empty(&obd_stale_exports)) {
1835                 exp = list_first_entry(&obd_stale_exports,
1836                                        struct obd_export, exp_stale_list);
1837                 list_del_init(&exp->exp_stale_list);
1838         }
1839         spin_unlock(&obd_stale_export_lock);
1840
1841         if (exp) {
1842                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1843                        atomic_read(&obd_stale_export_num));
1844         }
1845         RETURN(exp);
1846 }
1847 EXPORT_SYMBOL(obd_stale_export_get);
1848
1849 void obd_stale_export_put(struct obd_export *exp)
1850 {
1851         ENTRY;
1852
1853         LASSERT(list_empty(&exp->exp_stale_list));
1854         if (exp->exp_lock_hash &&
1855             atomic_read(&exp->exp_lock_hash->hs_count)) {
1856                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1857                        atomic_read(&obd_stale_export_num));
1858
1859                 spin_lock_bh(&exp->exp_bl_list_lock);
1860                 spin_lock(&obd_stale_export_lock);
1861                 /* Add to the tail if there is no blocked locks,
1862                  * to the head otherwise. */
1863                 if (list_empty(&exp->exp_bl_list))
1864                         list_add_tail(&exp->exp_stale_list,
1865                                       &obd_stale_exports);
1866                 else
1867                         list_add(&exp->exp_stale_list,
1868                                  &obd_stale_exports);
1869
1870                 spin_unlock(&obd_stale_export_lock);
1871                 spin_unlock_bh(&exp->exp_bl_list_lock);
1872         } else {
1873                 class_export_put(exp);
1874         }
1875         EXIT;
1876 }
1877 EXPORT_SYMBOL(obd_stale_export_put);
1878
1879 /**
1880  * Adjust the position of the export in the stale list,
1881  * i.e. move to the head of the list if is needed.
1882  **/
1883 void obd_stale_export_adjust(struct obd_export *exp)
1884 {
1885         LASSERT(exp != NULL);
1886         spin_lock_bh(&exp->exp_bl_list_lock);
1887         spin_lock(&obd_stale_export_lock);
1888
1889         if (!list_empty(&exp->exp_stale_list) &&
1890             !list_empty(&exp->exp_bl_list))
1891                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1892
1893         spin_unlock(&obd_stale_export_lock);
1894         spin_unlock_bh(&exp->exp_bl_list_lock);
1895 }
1896 EXPORT_SYMBOL(obd_stale_export_adjust);
1897
1898 /**
1899  * start destroy zombie import/export thread
1900  */
1901 int obd_zombie_impexp_init(void)
1902 {
1903         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1904                                            0, CFS_CPT_ANY,
1905                                            cfs_cpt_number(cfs_cpt_tab));
1906
1907         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1908 }
1909
1910 /**
1911  * stop destroy zombie import/export thread
1912  */
1913 void obd_zombie_impexp_stop(void)
1914 {
1915         destroy_workqueue(zombie_wq);
1916         LASSERT(list_empty(&obd_stale_exports));
1917 }
1918
1919 /***** Kernel-userspace comm helpers *******/
1920
1921 /* Get length of entire message, including header */
1922 int kuc_len(int payload_len)
1923 {
1924         return sizeof(struct kuc_hdr) + payload_len;
1925 }
1926 EXPORT_SYMBOL(kuc_len);
1927
1928 /* Get a pointer to kuc header, given a ptr to the payload
1929  * @param p Pointer to payload area
1930  * @returns Pointer to kuc header
1931  */
1932 struct kuc_hdr * kuc_ptr(void *p)
1933 {
1934         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1935         LASSERT(lh->kuc_magic == KUC_MAGIC);
1936         return lh;
1937 }
1938 EXPORT_SYMBOL(kuc_ptr);
1939
1940 /* Alloc space for a message, and fill in header
1941  * @return Pointer to payload area
1942  */
1943 void *kuc_alloc(int payload_len, int transport, int type)
1944 {
1945         struct kuc_hdr *lh;
1946         int len = kuc_len(payload_len);
1947
1948         OBD_ALLOC(lh, len);
1949         if (lh == NULL)
1950                 return ERR_PTR(-ENOMEM);
1951
1952         lh->kuc_magic = KUC_MAGIC;
1953         lh->kuc_transport = transport;
1954         lh->kuc_msgtype = type;
1955         lh->kuc_msglen = len;
1956
1957         return (void *)(lh + 1);
1958 }
1959 EXPORT_SYMBOL(kuc_alloc);
1960
1961 /* Takes pointer to payload area */
1962 void kuc_free(void *p, int payload_len)
1963 {
1964         struct kuc_hdr *lh = kuc_ptr(p);
1965         OBD_FREE(lh, kuc_len(payload_len));
1966 }
1967 EXPORT_SYMBOL(kuc_free);
1968
1969 struct obd_request_slot_waiter {
1970         struct list_head        orsw_entry;
1971         wait_queue_head_t       orsw_waitq;
1972         bool                    orsw_signaled;
1973 };
1974
1975 static bool obd_request_slot_avail(struct client_obd *cli,
1976                                    struct obd_request_slot_waiter *orsw)
1977 {
1978         bool avail;
1979
1980         spin_lock(&cli->cl_loi_list_lock);
1981         avail = !!list_empty(&orsw->orsw_entry);
1982         spin_unlock(&cli->cl_loi_list_lock);
1983
1984         return avail;
1985 };
1986
1987 /*
1988  * For network flow control, the RPC sponsor needs to acquire a credit
1989  * before sending the RPC. The credits count for a connection is defined
1990  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1991  * the subsequent RPC sponsors need to wait until others released their
1992  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1993  */
1994 int obd_get_request_slot(struct client_obd *cli)
1995 {
1996         struct obd_request_slot_waiter   orsw;
1997         int                              rc;
1998
1999         spin_lock(&cli->cl_loi_list_lock);
2000         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2001                 cli->cl_rpcs_in_flight++;
2002                 spin_unlock(&cli->cl_loi_list_lock);
2003                 return 0;
2004         }
2005
2006         init_waitqueue_head(&orsw.orsw_waitq);
2007         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2008         orsw.orsw_signaled = false;
2009         spin_unlock(&cli->cl_loi_list_lock);
2010
2011         rc = l_wait_event_abortable(orsw.orsw_waitq,
2012                                     obd_request_slot_avail(cli, &orsw) ||
2013                                     orsw.orsw_signaled);
2014
2015         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2016          * freed but other (such as obd_put_request_slot) is using it. */
2017         spin_lock(&cli->cl_loi_list_lock);
2018         if (rc != 0) {
2019                 if (!orsw.orsw_signaled) {
2020                         if (list_empty(&orsw.orsw_entry))
2021                                 cli->cl_rpcs_in_flight--;
2022                         else
2023                                 list_del(&orsw.orsw_entry);
2024                 }
2025                 rc = -EINTR;
2026         }
2027
2028         if (orsw.orsw_signaled) {
2029                 LASSERT(list_empty(&orsw.orsw_entry));
2030
2031                 rc = -EINTR;
2032         }
2033         spin_unlock(&cli->cl_loi_list_lock);
2034
2035         return rc;
2036 }
2037 EXPORT_SYMBOL(obd_get_request_slot);
2038
2039 void obd_put_request_slot(struct client_obd *cli)
2040 {
2041         struct obd_request_slot_waiter *orsw;
2042
2043         spin_lock(&cli->cl_loi_list_lock);
2044         cli->cl_rpcs_in_flight--;
2045
2046         /* If there is free slot, wakeup the first waiter. */
2047         if (!list_empty(&cli->cl_flight_waiters) &&
2048             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2049                 orsw = list_first_entry(&cli->cl_flight_waiters,
2050                                         struct obd_request_slot_waiter,
2051                                         orsw_entry);
2052                 list_del_init(&orsw->orsw_entry);
2053                 cli->cl_rpcs_in_flight++;
2054                 wake_up(&orsw->orsw_waitq);
2055         }
2056         spin_unlock(&cli->cl_loi_list_lock);
2057 }
2058 EXPORT_SYMBOL(obd_put_request_slot);
2059
2060 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2061 {
2062         return cli->cl_max_rpcs_in_flight;
2063 }
2064 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2065
2066 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2067 {
2068         struct obd_request_slot_waiter *orsw;
2069         __u32                           old;
2070         int                             diff;
2071         int                             i;
2072         int                             rc;
2073
2074         if (max > OBD_MAX_RIF_MAX || max < 1)
2075                 return -ERANGE;
2076
2077         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2078                cli->cl_import->imp_obd->obd_name, max,
2079                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2080
2081         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2082                    LUSTRE_MDC_NAME) == 0) {
2083                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2084                  * strictly lower that max_rpcs_in_flight */
2085                 if (max < 2) {
2086                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2087                                cli->cl_import->imp_obd->obd_name);
2088                         return -ERANGE;
2089                 }
2090                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2091                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2092                         if (rc != 0)
2093                                 return rc;
2094                 }
2095         }
2096
2097         spin_lock(&cli->cl_loi_list_lock);
2098         old = cli->cl_max_rpcs_in_flight;
2099         cli->cl_max_rpcs_in_flight = max;
2100         client_adjust_max_dirty(cli);
2101
2102         diff = max - old;
2103
2104         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2105         for (i = 0; i < diff; i++) {
2106                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2107                                                 struct obd_request_slot_waiter,
2108                                                 orsw_entry);
2109                 if (!orsw)
2110                         break;
2111
2112                 list_del_init(&orsw->orsw_entry);
2113                 cli->cl_rpcs_in_flight++;
2114                 wake_up(&orsw->orsw_waitq);
2115         }
2116         spin_unlock(&cli->cl_loi_list_lock);
2117
2118         return 0;
2119 }
2120 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2121
2122 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2123 {
2124         return cli->cl_max_mod_rpcs_in_flight;
2125 }
2126 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2127
2128 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2129 {
2130         struct obd_connect_data *ocd;
2131         __u16 maxmodrpcs;
2132         __u16 prev;
2133
2134         if (max > OBD_MAX_RIF_MAX || max < 1)
2135                 return -ERANGE;
2136
2137         ocd = &cli->cl_import->imp_connect_data;
2138         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2139                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2140                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2141
2142         if (max == OBD_MAX_RIF_MAX)
2143                 max = OBD_MAX_RIF_MAX - 1;
2144
2145         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2146          * increase this value, also bump up max_rpcs_in_flight to match.
2147          */
2148         if (max >= cli->cl_max_rpcs_in_flight) {
2149                 CDEBUG(D_INFO,
2150                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2151                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2152                 obd_set_max_rpcs_in_flight(cli, max + 1);
2153         }
2154
2155         /* cannot exceed max modify RPCs in flight supported by the server,
2156          * but verify ocd_connect_flags is at least initialized first.  If
2157          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2158          */
2159         if (!ocd->ocd_connect_flags) {
2160                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2161         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2162                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2163                 if (maxmodrpcs == 0) { /* connection not finished yet */
2164                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2165                         CDEBUG(D_INFO,
2166                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2167                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2168                 }
2169         } else {
2170                 maxmodrpcs = 1;
2171         }
2172         if (max > maxmodrpcs) {
2173                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2174                        cli->cl_import->imp_obd->obd_name,
2175                        max, maxmodrpcs);
2176                 return -ERANGE;
2177         }
2178
2179         spin_lock(&cli->cl_mod_rpcs_lock);
2180
2181         prev = cli->cl_max_mod_rpcs_in_flight;
2182         cli->cl_max_mod_rpcs_in_flight = max;
2183
2184         /* wakeup waiters if limit has been increased */
2185         if (cli->cl_max_mod_rpcs_in_flight > prev)
2186                 wake_up(&cli->cl_mod_rpcs_waitq);
2187
2188         spin_unlock(&cli->cl_mod_rpcs_lock);
2189
2190         return 0;
2191 }
2192 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2193
2194 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2195                                struct seq_file *seq)
2196 {
2197         unsigned long mod_tot = 0, mod_cum;
2198         int i;
2199
2200         spin_lock(&cli->cl_mod_rpcs_lock);
2201         lprocfs_stats_header(seq, ktime_get(), cli->cl_mod_rpcs_init, 25,
2202                              ":", true);
2203         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2204                    cli->cl_mod_rpcs_in_flight);
2205
2206         seq_printf(seq, "\n\t\t\tmodify\n");
2207         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2208
2209         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2210
2211         mod_cum = 0;
2212         for (i = 0; i < OBD_HIST_MAX; i++) {
2213                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2214
2215                 mod_cum += mod;
2216                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2217                            i, mod, pct(mod, mod_tot),
2218                            pct(mod_cum, mod_tot));
2219                 if (mod_cum == mod_tot)
2220                         break;
2221         }
2222
2223         spin_unlock(&cli->cl_mod_rpcs_lock);
2224
2225         return 0;
2226 }
2227 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2228
2229 /* The number of modify RPCs sent in parallel is limited
2230  * because the server has a finite number of slots per client to
2231  * store request result and ensure reply reconstruction when needed.
2232  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2233  * that takes into account server limit and cl_max_rpcs_in_flight
2234  * value.
2235  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2236  * one close request is allowed above the maximum.
2237  */
2238 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2239                                                  bool close_req)
2240 {
2241         bool avail;
2242
2243         /* A slot is available if
2244          * - number of modify RPCs in flight is less than the max
2245          * - it's a close RPC and no other close request is in flight
2246          */
2247         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2248                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2249
2250         return avail;
2251 }
2252
2253 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2254                                          bool close_req)
2255 {
2256         bool avail;
2257
2258         spin_lock(&cli->cl_mod_rpcs_lock);
2259         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2260         spin_unlock(&cli->cl_mod_rpcs_lock);
2261         return avail;
2262 }
2263
2264
2265 /* Get a modify RPC slot from the obd client @cli according
2266  * to the kind of operation @opc that is going to be sent
2267  * and the intent @it of the operation if it applies.
2268  * If the maximum number of modify RPCs in flight is reached
2269  * the thread is put to sleep.
2270  * Returns the tag to be set in the request message. Tag 0
2271  * is reserved for non-modifying requests.
2272  */
2273 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2274 {
2275         bool                    close_req = false;
2276         __u16                   i, max;
2277
2278         if (opc == MDS_CLOSE)
2279                 close_req = true;
2280
2281         do {
2282                 spin_lock(&cli->cl_mod_rpcs_lock);
2283                 max = cli->cl_max_mod_rpcs_in_flight;
2284                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2285                         /* there is a slot available */
2286                         cli->cl_mod_rpcs_in_flight++;
2287                         if (close_req)
2288                                 cli->cl_close_rpcs_in_flight++;
2289                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2290                                          cli->cl_mod_rpcs_in_flight);
2291                         /* find a free tag */
2292                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2293                                                 max + 1);
2294                         LASSERT(i < OBD_MAX_RIF_MAX);
2295                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2296                         spin_unlock(&cli->cl_mod_rpcs_lock);
2297                         /* tag 0 is reserved for non-modify RPCs */
2298
2299                         CDEBUG(D_RPCTRACE,
2300                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2301                                cli->cl_import->imp_obd->obd_name,
2302                                i + 1, opc, max);
2303
2304                         return i + 1;
2305                 }
2306                 spin_unlock(&cli->cl_mod_rpcs_lock);
2307
2308                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2309                        "opc %u, max %hu\n",
2310                        cli->cl_import->imp_obd->obd_name, opc, max);
2311
2312                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2313                                           obd_mod_rpc_slot_avail(cli,
2314                                                                  close_req));
2315         } while (true);
2316 }
2317 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2318
2319 /* Put a modify RPC slot from the obd client @cli according
2320  * to the kind of operation @opc that has been sent.
2321  */
2322 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2323 {
2324         bool                    close_req = false;
2325
2326         if (tag == 0)
2327                 return;
2328
2329         if (opc == MDS_CLOSE)
2330                 close_req = true;
2331
2332         spin_lock(&cli->cl_mod_rpcs_lock);
2333         cli->cl_mod_rpcs_in_flight--;
2334         if (close_req)
2335                 cli->cl_close_rpcs_in_flight--;
2336         /* release the tag in the bitmap */
2337         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2338         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2339         spin_unlock(&cli->cl_mod_rpcs_lock);
2340         /* LU-14741 - to prevent close RPCs stuck behind normal ones */
2341         if (close_req)
2342                 wake_up_all(&cli->cl_mod_rpcs_waitq);
2343         else
2344                 wake_up(&cli->cl_mod_rpcs_waitq);
2345 }
2346 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2347