Whamcloud - gitweb
97672f979638d865bd533239f90d9286c75e9c4f
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         type->typ_debugfs_entry = symlink;
208         type->typ_sym_filter = true;
209
210         if (enable_proc) {
211                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
212                                                       NULL, NULL);
213                 if (IS_ERR(type->typ_procroot)) {
214                         CERROR("%s: can't create compat proc entry: %d\n",
215                                name, (int)PTR_ERR(type->typ_procroot));
216                         type->typ_procroot = NULL;
217                 }
218         }
219
220         return type;
221 }
222 EXPORT_SYMBOL(class_add_symlinks);
223 #endif /* HAVE_SERVER_SUPPORT */
224
225 #define CLASS_MAX_NAME 1024
226
227 int class_register_type(const struct obd_ops *dt_ops,
228                         const struct md_ops *md_ops,
229                         bool enable_proc, struct lprocfs_vars *vars,
230                         const char *name, struct lu_device_type *ldt)
231 {
232         struct obd_type *type;
233         int rc;
234
235         ENTRY;
236         /* sanity check */
237         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
238
239         type = class_search_type(name);
240         if (type) {
241 #ifdef HAVE_SERVER_SUPPORT
242                 if (type->typ_sym_filter)
243                         goto dir_exist;
244 #endif /* HAVE_SERVER_SUPPORT */
245                 kobject_put(&type->typ_kobj);
246                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
247                 RETURN(-EEXIST);
248         }
249
250         OBD_ALLOC(type, sizeof(*type));
251         if (type == NULL)
252                 RETURN(-ENOMEM);
253
254         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
255         type->typ_kobj.kset = lustre_kset;
256         kobject_init(&type->typ_kobj, &class_ktype);
257 #ifdef HAVE_SERVER_SUPPORT
258 dir_exist:
259 #endif /* HAVE_SERVER_SUPPORT */
260
261         type->typ_dt_ops = dt_ops;
262         type->typ_md_ops = md_ops;
263
264 #ifdef HAVE_SERVER_SUPPORT
265         if (type->typ_sym_filter) {
266                 type->typ_sym_filter = false;
267                 kobject_put(&type->typ_kobj);
268                 goto setup_ldt;
269         }
270 #endif
271 #ifdef CONFIG_PROC_FS
272         if (enable_proc && !type->typ_procroot) {
273                 type->typ_procroot = lprocfs_register(name,
274                                                       proc_lustre_root,
275                                                       NULL, type);
276                 if (IS_ERR(type->typ_procroot)) {
277                         rc = PTR_ERR(type->typ_procroot);
278                         type->typ_procroot = NULL;
279                         GOTO(failed, rc);
280                 }
281         }
282 #endif
283         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
284         ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
285
286         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
287         if (rc)
288                 GOTO(failed, rc);
289 #ifdef HAVE_SERVER_SUPPORT
290 setup_ldt:
291 #endif
292         if (ldt) {
293                 rc = lu_device_type_init(ldt);
294                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
295                 wake_up_var(&type->typ_lu);
296                 if (rc)
297                         GOTO(failed, rc);
298         }
299
300         RETURN(0);
301
302 failed:
303         kobject_put(&type->typ_kobj);
304
305         RETURN(rc);
306 }
307 EXPORT_SYMBOL(class_register_type);
308
309 int class_unregister_type(const char *name)
310 {
311         struct obd_type *type = class_search_type(name);
312         int rc = 0;
313         ENTRY;
314
315         if (!type) {
316                 CERROR("unknown obd type\n");
317                 RETURN(-EINVAL);
318         }
319
320         if (atomic_read(&type->typ_refcnt)) {
321                 CERROR("type %s has refcount (%d)\n", name,
322                        atomic_read(&type->typ_refcnt));
323                 /* This is a bad situation, let's make the best of it */
324                 /* Remove ops, but leave the name for debugging */
325                 type->typ_dt_ops = NULL;
326                 type->typ_md_ops = NULL;
327                 GOTO(out_put, rc = -EBUSY);
328         }
329
330         /* Put the final ref */
331         kobject_put(&type->typ_kobj);
332 out_put:
333         /* Put the ref returned by class_search_type() */
334         kobject_put(&type->typ_kobj);
335
336         RETURN(rc);
337 } /* class_unregister_type */
338 EXPORT_SYMBOL(class_unregister_type);
339
340 /**
341  * Create a new obd device.
342  *
343  * Allocate the new obd_device and initialize it.
344  *
345  * \param[in] type_name obd device type string.
346  * \param[in] name      obd device name.
347  * \param[in] uuid      obd device UUID
348  *
349  * \retval newdev         pointer to created obd_device
350  * \retval ERR_PTR(errno) on error
351  */
352 struct obd_device *class_newdev(const char *type_name, const char *name,
353                                 const char *uuid)
354 {
355         struct obd_device *newdev;
356         struct obd_type *type = NULL;
357         ENTRY;
358
359         if (strlen(name) >= MAX_OBD_NAME) {
360                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
361                 RETURN(ERR_PTR(-EINVAL));
362         }
363
364         type = class_get_type(type_name);
365         if (type == NULL){
366                 CERROR("OBD: unknown type: %s\n", type_name);
367                 RETURN(ERR_PTR(-ENODEV));
368         }
369
370         newdev = obd_device_alloc();
371         if (newdev == NULL) {
372                 class_put_type(type);
373                 RETURN(ERR_PTR(-ENOMEM));
374         }
375         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
376         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
377         newdev->obd_type = type;
378         newdev->obd_minor = -1;
379
380         rwlock_init(&newdev->obd_pool_lock);
381         newdev->obd_pool_limit = 0;
382         newdev->obd_pool_slv = 0;
383
384         INIT_LIST_HEAD(&newdev->obd_exports);
385         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
386         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
387         INIT_LIST_HEAD(&newdev->obd_exports_timed);
388         INIT_LIST_HEAD(&newdev->obd_nid_stats);
389         spin_lock_init(&newdev->obd_nid_lock);
390         spin_lock_init(&newdev->obd_dev_lock);
391         mutex_init(&newdev->obd_dev_mutex);
392         spin_lock_init(&newdev->obd_osfs_lock);
393         /* newdev->obd_osfs_age must be set to a value in the distant
394          * past to guarantee a fresh statfs is fetched on mount. */
395         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
396
397         /* XXX belongs in setup not attach  */
398         init_rwsem(&newdev->obd_observer_link_sem);
399         /* recovery data */
400         spin_lock_init(&newdev->obd_recovery_task_lock);
401         init_waitqueue_head(&newdev->obd_next_transno_waitq);
402         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
403         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
404         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
405         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
406         INIT_LIST_HEAD(&newdev->obd_evict_list);
407         INIT_LIST_HEAD(&newdev->obd_lwp_list);
408
409         llog_group_init(&newdev->obd_olg);
410         /* Detach drops this */
411         atomic_set(&newdev->obd_refcount, 1);
412         lu_ref_init(&newdev->obd_reference);
413         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
414
415         newdev->obd_conn_inprogress = 0;
416
417         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
418
419         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
420                newdev->obd_name, newdev);
421
422         return newdev;
423 }
424
425 /**
426  * Free obd device.
427  *
428  * \param[in] obd obd_device to be freed
429  *
430  * \retval none
431  */
432 void class_free_dev(struct obd_device *obd)
433 {
434         struct obd_type *obd_type = obd->obd_type;
435
436         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
437                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
438         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
439                  "obd %p != obd_devs[%d] %p\n",
440                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
441         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
442                  "obd_refcount should be 0, not %d\n",
443                  atomic_read(&obd->obd_refcount));
444         LASSERT(obd_type != NULL);
445
446         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
447                obd->obd_name, obd->obd_type->typ_name);
448
449         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
450                          obd->obd_name, obd->obd_uuid.uuid);
451         if (obd->obd_stopping) {
452                 int err;
453
454                 /* If we're not stopping, we were never set up */
455                 err = obd_cleanup(obd);
456                 if (err)
457                         CERROR("Cleanup %s returned %d\n",
458                                 obd->obd_name, err);
459         }
460
461         obd_device_free(obd);
462
463         class_put_type(obd_type);
464 }
465
466 /**
467  * Unregister obd device.
468  *
469  * Free slot in obd_dev[] used by \a obd.
470  *
471  * \param[in] new_obd obd_device to be unregistered
472  *
473  * \retval none
474  */
475 void class_unregister_device(struct obd_device *obd)
476 {
477         write_lock(&obd_dev_lock);
478         if (obd->obd_minor >= 0) {
479                 LASSERT(obd_devs[obd->obd_minor] == obd);
480                 obd_devs[obd->obd_minor] = NULL;
481                 obd->obd_minor = -1;
482         }
483         write_unlock(&obd_dev_lock);
484 }
485
486 /**
487  * Register obd device.
488  *
489  * Find free slot in obd_devs[], fills it with \a new_obd.
490  *
491  * \param[in] new_obd obd_device to be registered
492  *
493  * \retval 0          success
494  * \retval -EEXIST    device with this name is registered
495  * \retval -EOVERFLOW obd_devs[] is full
496  */
497 int class_register_device(struct obd_device *new_obd)
498 {
499         int ret = 0;
500         int i;
501         int new_obd_minor = 0;
502         bool minor_assign = false;
503         bool retried = false;
504
505 again:
506         write_lock(&obd_dev_lock);
507         for (i = 0; i < class_devno_max(); i++) {
508                 struct obd_device *obd = class_num2obd(i);
509
510                 if (obd != NULL &&
511                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
512
513                         if (!retried) {
514                                 write_unlock(&obd_dev_lock);
515
516                                 /* the obd_device could be waited to be
517                                  * destroyed by the "obd_zombie_impexp_thread".
518                                  */
519                                 obd_zombie_barrier();
520                                 retried = true;
521                                 goto again;
522                         }
523
524                         CERROR("%s: already exists, won't add\n",
525                                obd->obd_name);
526                         /* in case we found a free slot before duplicate */
527                         minor_assign = false;
528                         ret = -EEXIST;
529                         break;
530                 }
531                 if (!minor_assign && obd == NULL) {
532                         new_obd_minor = i;
533                         minor_assign = true;
534                 }
535         }
536
537         if (minor_assign) {
538                 new_obd->obd_minor = new_obd_minor;
539                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
540                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
541                 obd_devs[new_obd_minor] = new_obd;
542         } else {
543                 if (ret == 0) {
544                         ret = -EOVERFLOW;
545                         CERROR("%s: all %u/%u devices used, increase "
546                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
547                                i, class_devno_max(), ret);
548                 }
549         }
550         write_unlock(&obd_dev_lock);
551
552         RETURN(ret);
553 }
554
555 static int class_name2dev_nolock(const char *name)
556 {
557         int i;
558
559         if (!name)
560                 return -1;
561
562         for (i = 0; i < class_devno_max(); i++) {
563                 struct obd_device *obd = class_num2obd(i);
564
565                 if (obd && strcmp(name, obd->obd_name) == 0) {
566                         /* Make sure we finished attaching before we give
567                            out any references */
568                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
569                         if (obd->obd_attached) {
570                                 return i;
571                         }
572                         break;
573                 }
574         }
575
576         return -1;
577 }
578
579 int class_name2dev(const char *name)
580 {
581         int i;
582
583         if (!name)
584                 return -1;
585
586         read_lock(&obd_dev_lock);
587         i = class_name2dev_nolock(name);
588         read_unlock(&obd_dev_lock);
589
590         return i;
591 }
592 EXPORT_SYMBOL(class_name2dev);
593
594 struct obd_device *class_name2obd(const char *name)
595 {
596         int dev = class_name2dev(name);
597
598         if (dev < 0 || dev > class_devno_max())
599                 return NULL;
600         return class_num2obd(dev);
601 }
602 EXPORT_SYMBOL(class_name2obd);
603
604 int class_uuid2dev_nolock(struct obd_uuid *uuid)
605 {
606         int i;
607
608         for (i = 0; i < class_devno_max(); i++) {
609                 struct obd_device *obd = class_num2obd(i);
610
611                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
612                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
613                         return i;
614                 }
615         }
616
617         return -1;
618 }
619
620 int class_uuid2dev(struct obd_uuid *uuid)
621 {
622         int i;
623
624         read_lock(&obd_dev_lock);
625         i = class_uuid2dev_nolock(uuid);
626         read_unlock(&obd_dev_lock);
627
628         return i;
629 }
630 EXPORT_SYMBOL(class_uuid2dev);
631
632 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
633 {
634         int dev = class_uuid2dev(uuid);
635         if (dev < 0)
636                 return NULL;
637         return class_num2obd(dev);
638 }
639 EXPORT_SYMBOL(class_uuid2obd);
640
641 /**
642  * Get obd device from ::obd_devs[]
643  *
644  * \param num [in] array index
645  *
646  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
647  *         otherwise return the obd device there.
648  */
649 struct obd_device *class_num2obd(int num)
650 {
651         struct obd_device *obd = NULL;
652
653         if (num < class_devno_max()) {
654                 obd = obd_devs[num];
655                 if (obd == NULL)
656                         return NULL;
657
658                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
659                          "%p obd_magic %08x != %08x\n",
660                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
661                 LASSERTF(obd->obd_minor == num,
662                          "%p obd_minor %0d != %0d\n",
663                          obd, obd->obd_minor, num);
664         }
665
666         return obd;
667 }
668
669 /**
670  * Find obd in obd_dev[] by name or uuid.
671  *
672  * Increment obd's refcount if found.
673  *
674  * \param[in] str obd name or uuid
675  *
676  * \retval NULL    if not found
677  * \retval target  pointer to found obd_device
678  */
679 struct obd_device *class_dev_by_str(const char *str)
680 {
681         struct obd_device *target = NULL;
682         struct obd_uuid tgtuuid;
683         int rc;
684
685         obd_str2uuid(&tgtuuid, str);
686
687         read_lock(&obd_dev_lock);
688         rc = class_uuid2dev_nolock(&tgtuuid);
689         if (rc < 0)
690                 rc = class_name2dev_nolock(str);
691
692         if (rc >= 0)
693                 target = class_num2obd(rc);
694
695         if (target != NULL)
696                 class_incref(target, "find", current);
697         read_unlock(&obd_dev_lock);
698
699         RETURN(target);
700 }
701 EXPORT_SYMBOL(class_dev_by_str);
702
703 /**
704  * Get obd devices count. Device in any
705  *    state are counted
706  * \retval obd device count
707  */
708 int get_devices_count(void)
709 {
710         int index, max_index = class_devno_max(), dev_count = 0;
711
712         read_lock(&obd_dev_lock);
713         for (index = 0; index <= max_index; index++) {
714                 struct obd_device *obd = class_num2obd(index);
715                 if (obd != NULL)
716                         dev_count++;
717         }
718         read_unlock(&obd_dev_lock);
719
720         return dev_count;
721 }
722 EXPORT_SYMBOL(get_devices_count);
723
724 void class_obd_list(void)
725 {
726         char *status;
727         int i;
728
729         read_lock(&obd_dev_lock);
730         for (i = 0; i < class_devno_max(); i++) {
731                 struct obd_device *obd = class_num2obd(i);
732
733                 if (obd == NULL)
734                         continue;
735                 if (obd->obd_stopping)
736                         status = "ST";
737                 else if (obd->obd_set_up)
738                         status = "UP";
739                 else if (obd->obd_attached)
740                         status = "AT";
741                 else
742                         status = "--";
743                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
744                          i, status, obd->obd_type->typ_name,
745                          obd->obd_name, obd->obd_uuid.uuid,
746                          atomic_read(&obd->obd_refcount));
747         }
748         read_unlock(&obd_dev_lock);
749 }
750
751 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
752  * specified, then only the client with that uuid is returned,
753  * otherwise any client connected to the tgt is returned.
754  */
755 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
756                                          const char *type_name,
757                                          struct obd_uuid *grp_uuid)
758 {
759         int i;
760
761         read_lock(&obd_dev_lock);
762         for (i = 0; i < class_devno_max(); i++) {
763                 struct obd_device *obd = class_num2obd(i);
764
765                 if (obd == NULL)
766                         continue;
767                 if ((strncmp(obd->obd_type->typ_name, type_name,
768                              strlen(type_name)) == 0)) {
769                         if (obd_uuid_equals(tgt_uuid,
770                                             &obd->u.cli.cl_target_uuid) &&
771                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
772                                                          &obd->obd_uuid) : 1)) {
773                                 read_unlock(&obd_dev_lock);
774                                 return obd;
775                         }
776                 }
777         }
778         read_unlock(&obd_dev_lock);
779
780         return NULL;
781 }
782 EXPORT_SYMBOL(class_find_client_obd);
783
784 /* Iterate the obd_device list looking devices have grp_uuid. Start
785  * searching at *next, and if a device is found, the next index to look
786  * at is saved in *next. If next is NULL, then the first matching device
787  * will always be returned.
788  */
789 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
790 {
791         int i;
792
793         if (next == NULL)
794                 i = 0;
795         else if (*next >= 0 && *next < class_devno_max())
796                 i = *next;
797         else
798                 return NULL;
799
800         read_lock(&obd_dev_lock);
801         for (; i < class_devno_max(); i++) {
802                 struct obd_device *obd = class_num2obd(i);
803
804                 if (obd == NULL)
805                         continue;
806                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
807                         if (next != NULL)
808                                 *next = i+1;
809                         read_unlock(&obd_dev_lock);
810                         return obd;
811                 }
812         }
813         read_unlock(&obd_dev_lock);
814
815         return NULL;
816 }
817 EXPORT_SYMBOL(class_devices_in_group);
818
819 /**
820  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
821  * adjust sptlrpc settings accordingly.
822  */
823 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
824 {
825         struct obd_device  *obd;
826         const char         *type;
827         int                 i, rc = 0, rc2;
828
829         LASSERT(namelen > 0);
830
831         read_lock(&obd_dev_lock);
832         for (i = 0; i < class_devno_max(); i++) {
833                 obd = class_num2obd(i);
834
835                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
836                         continue;
837
838                 /* only notify mdc, osc, osp, lwp, mdt, ost
839                  * because only these have a -sptlrpc llog */
840                 type = obd->obd_type->typ_name;
841                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
842                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
843                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
844                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
845                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
846                     strcmp(type, LUSTRE_OST_NAME) != 0)
847                         continue;
848
849                 if (strncmp(obd->obd_name, fsname, namelen))
850                         continue;
851
852                 class_incref(obd, __FUNCTION__, obd);
853                 read_unlock(&obd_dev_lock);
854                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
855                                          sizeof(KEY_SPTLRPC_CONF),
856                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
857                 rc = rc ? rc : rc2;
858                 class_decref(obd, __FUNCTION__, obd);
859                 read_lock(&obd_dev_lock);
860         }
861         read_unlock(&obd_dev_lock);
862         return rc;
863 }
864 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
865
866 void obd_cleanup_caches(void)
867 {
868         ENTRY;
869         if (obd_device_cachep) {
870                 kmem_cache_destroy(obd_device_cachep);
871                 obd_device_cachep = NULL;
872         }
873
874         EXIT;
875 }
876
877 int obd_init_caches(void)
878 {
879         int rc;
880         ENTRY;
881
882         LASSERT(obd_device_cachep == NULL);
883         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
884                                 sizeof(struct obd_device),
885                                 0, 0, 0, sizeof(struct obd_device), NULL);
886         if (!obd_device_cachep)
887                 GOTO(out, rc = -ENOMEM);
888
889         RETURN(0);
890 out:
891         obd_cleanup_caches();
892         RETURN(rc);
893 }
894
895 static const char export_handle_owner[] = "export";
896
897 /* map connection to client */
898 struct obd_export *class_conn2export(struct lustre_handle *conn)
899 {
900         struct obd_export *export;
901         ENTRY;
902
903         if (!conn) {
904                 CDEBUG(D_CACHE, "looking for null handle\n");
905                 RETURN(NULL);
906         }
907
908         if (conn->cookie == -1) {  /* this means assign a new connection */
909                 CDEBUG(D_CACHE, "want a new connection\n");
910                 RETURN(NULL);
911         }
912
913         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
914         export = class_handle2object(conn->cookie, export_handle_owner);
915         RETURN(export);
916 }
917 EXPORT_SYMBOL(class_conn2export);
918
919 struct obd_device *class_exp2obd(struct obd_export *exp)
920 {
921         if (exp)
922                 return exp->exp_obd;
923         return NULL;
924 }
925 EXPORT_SYMBOL(class_exp2obd);
926
927 struct obd_import *class_exp2cliimp(struct obd_export *exp)
928 {
929         struct obd_device *obd = exp->exp_obd;
930         if (obd == NULL)
931                 return NULL;
932         return obd->u.cli.cl_import;
933 }
934 EXPORT_SYMBOL(class_exp2cliimp);
935
936 /* Export management functions */
937 static void class_export_destroy(struct obd_export *exp)
938 {
939         struct obd_device *obd = exp->exp_obd;
940         ENTRY;
941
942         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
943         LASSERT(obd != NULL);
944
945         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
946                exp->exp_client_uuid.uuid, obd->obd_name);
947
948         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
949         if (exp->exp_connection)
950                 ptlrpc_put_connection_superhack(exp->exp_connection);
951
952         LASSERT(list_empty(&exp->exp_outstanding_replies));
953         LASSERT(list_empty(&exp->exp_uncommitted_replies));
954         LASSERT(list_empty(&exp->exp_req_replay_queue));
955         LASSERT(list_empty(&exp->exp_hp_rpcs));
956         obd_destroy_export(exp);
957         /* self export doesn't hold a reference to an obd, although it
958          * exists until freeing of the obd */
959         if (exp != obd->obd_self_export)
960                 class_decref(obd, "export", exp);
961
962         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
963         kfree_rcu(exp, exp_handle.h_rcu);
964         EXIT;
965 }
966
967 struct obd_export *class_export_get(struct obd_export *exp)
968 {
969         refcount_inc(&exp->exp_handle.h_ref);
970         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
971                refcount_read(&exp->exp_handle.h_ref));
972         return exp;
973 }
974 EXPORT_SYMBOL(class_export_get);
975
976 void class_export_put(struct obd_export *exp)
977 {
978         LASSERT(exp != NULL);
979         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
980         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
981         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
982                refcount_read(&exp->exp_handle.h_ref) - 1);
983
984         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
985                 struct obd_device *obd = exp->exp_obd;
986
987                 CDEBUG(D_IOCTL, "final put %p/%s\n",
988                        exp, exp->exp_client_uuid.uuid);
989
990                 /* release nid stat refererence */
991                 lprocfs_exp_cleanup(exp);
992
993                 if (exp == obd->obd_self_export) {
994                         /* self export should be destroyed without
995                          * zombie thread as it doesn't hold a
996                          * reference to obd and doesn't hold any
997                          * resources */
998                         class_export_destroy(exp);
999                         /* self export is destroyed, no class
1000                          * references exist and it is safe to free
1001                          * obd */
1002                         class_free_dev(obd);
1003                 } else {
1004                         LASSERT(!list_empty(&exp->exp_obd_chain));
1005                         obd_zombie_export_add(exp);
1006                 }
1007
1008         }
1009 }
1010 EXPORT_SYMBOL(class_export_put);
1011
1012 static void obd_zombie_exp_cull(struct work_struct *ws)
1013 {
1014         struct obd_export *export;
1015
1016         export = container_of(ws, struct obd_export, exp_zombie_work);
1017         class_export_destroy(export);
1018 }
1019
1020 /* Creates a new export, adds it to the hash table, and returns a
1021  * pointer to it. The refcount is 2: one for the hash reference, and
1022  * one for the pointer returned by this function. */
1023 struct obd_export *__class_new_export(struct obd_device *obd,
1024                                       struct obd_uuid *cluuid, bool is_self)
1025 {
1026         struct obd_export *export;
1027         int rc = 0;
1028         ENTRY;
1029
1030         OBD_ALLOC_PTR(export);
1031         if (!export)
1032                 return ERR_PTR(-ENOMEM);
1033
1034         export->exp_conn_cnt = 0;
1035         export->exp_lock_hash = NULL;
1036         export->exp_flock_hash = NULL;
1037         /* 2 = class_handle_hash + last */
1038         refcount_set(&export->exp_handle.h_ref, 2);
1039         atomic_set(&export->exp_rpc_count, 0);
1040         atomic_set(&export->exp_cb_count, 0);
1041         atomic_set(&export->exp_locks_count, 0);
1042 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1043         INIT_LIST_HEAD(&export->exp_locks_list);
1044         spin_lock_init(&export->exp_locks_list_guard);
1045 #endif
1046         atomic_set(&export->exp_replay_count, 0);
1047         export->exp_obd = obd;
1048         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1049         spin_lock_init(&export->exp_uncommitted_replies_lock);
1050         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1051         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1052         INIT_HLIST_NODE(&export->exp_handle.h_link);
1053         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1054         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1055         class_handle_hash(&export->exp_handle, export_handle_owner);
1056         export->exp_last_request_time = ktime_get_real_seconds();
1057         spin_lock_init(&export->exp_lock);
1058         spin_lock_init(&export->exp_rpc_lock);
1059         INIT_HLIST_NODE(&export->exp_nid_hash);
1060         INIT_HLIST_NODE(&export->exp_gen_hash);
1061         spin_lock_init(&export->exp_bl_list_lock);
1062         INIT_LIST_HEAD(&export->exp_bl_list);
1063         INIT_LIST_HEAD(&export->exp_stale_list);
1064         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1065
1066         export->exp_sp_peer = LUSTRE_SP_ANY;
1067         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1068         export->exp_client_uuid = *cluuid;
1069         obd_init_export(export);
1070
1071         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1072
1073         spin_lock(&obd->obd_dev_lock);
1074         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1075                 /* shouldn't happen, but might race */
1076                 if (obd->obd_stopping)
1077                         GOTO(exit_unlock, rc = -ENODEV);
1078
1079                 rc = obd_uuid_add(obd, export);
1080                 if (rc != 0) {
1081                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1082                                       obd->obd_name, cluuid->uuid, rc);
1083                         GOTO(exit_unlock, rc = -EALREADY);
1084                 }
1085         }
1086
1087         if (!is_self) {
1088                 class_incref(obd, "export", export);
1089                 list_add_tail(&export->exp_obd_chain_timed,
1090                               &obd->obd_exports_timed);
1091                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1092                 obd->obd_num_exports++;
1093         } else {
1094                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1095                 INIT_LIST_HEAD(&export->exp_obd_chain);
1096         }
1097         spin_unlock(&obd->obd_dev_lock);
1098         RETURN(export);
1099
1100 exit_unlock:
1101         spin_unlock(&obd->obd_dev_lock);
1102         class_handle_unhash(&export->exp_handle);
1103         obd_destroy_export(export);
1104         OBD_FREE_PTR(export);
1105         return ERR_PTR(rc);
1106 }
1107
1108 struct obd_export *class_new_export(struct obd_device *obd,
1109                                     struct obd_uuid *uuid)
1110 {
1111         return __class_new_export(obd, uuid, false);
1112 }
1113 EXPORT_SYMBOL(class_new_export);
1114
1115 struct obd_export *class_new_export_self(struct obd_device *obd,
1116                                          struct obd_uuid *uuid)
1117 {
1118         return __class_new_export(obd, uuid, true);
1119 }
1120
1121 void class_unlink_export(struct obd_export *exp)
1122 {
1123         class_handle_unhash(&exp->exp_handle);
1124
1125         if (exp->exp_obd->obd_self_export == exp) {
1126                 class_export_put(exp);
1127                 return;
1128         }
1129
1130         spin_lock(&exp->exp_obd->obd_dev_lock);
1131         /* delete an uuid-export hashitem from hashtables */
1132         if (exp != exp->exp_obd->obd_self_export)
1133                 obd_uuid_del(exp->exp_obd, exp);
1134
1135 #ifdef HAVE_SERVER_SUPPORT
1136         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1137                 struct tg_export_data   *ted = &exp->exp_target_data;
1138                 struct cfs_hash         *hash;
1139
1140                 /* Because obd_gen_hash will not be released until
1141                  * class_cleanup(), so hash should never be NULL here */
1142                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1143                 LASSERT(hash != NULL);
1144                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1145                              &exp->exp_gen_hash);
1146                 cfs_hash_putref(hash);
1147         }
1148 #endif /* HAVE_SERVER_SUPPORT */
1149
1150         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1151         list_del_init(&exp->exp_obd_chain_timed);
1152         exp->exp_obd->obd_num_exports--;
1153         spin_unlock(&exp->exp_obd->obd_dev_lock);
1154         atomic_inc(&obd_stale_export_num);
1155
1156         /* A reference is kept by obd_stale_exports list */
1157         obd_stale_export_put(exp);
1158 }
1159 EXPORT_SYMBOL(class_unlink_export);
1160
1161 /* Import management functions */
1162 static void obd_zombie_import_free(struct obd_import *imp)
1163 {
1164         ENTRY;
1165
1166         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1167                 imp->imp_obd->obd_name);
1168
1169         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1170
1171         ptlrpc_put_connection_superhack(imp->imp_connection);
1172
1173         while (!list_empty(&imp->imp_conn_list)) {
1174                 struct obd_import_conn *imp_conn;
1175
1176                 imp_conn = list_first_entry(&imp->imp_conn_list,
1177                                             struct obd_import_conn, oic_item);
1178                 list_del_init(&imp_conn->oic_item);
1179                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1180                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1181         }
1182
1183         LASSERT(imp->imp_sec == NULL);
1184         class_decref(imp->imp_obd, "import", imp);
1185         OBD_FREE_PTR(imp);
1186         EXIT;
1187 }
1188
1189 struct obd_import *class_import_get(struct obd_import *import)
1190 {
1191         refcount_inc(&import->imp_refcount);
1192         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1193                refcount_read(&import->imp_refcount),
1194                import->imp_obd->obd_name);
1195         return import;
1196 }
1197 EXPORT_SYMBOL(class_import_get);
1198
1199 void class_import_put(struct obd_import *imp)
1200 {
1201         ENTRY;
1202
1203         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1204
1205         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1206                refcount_read(&imp->imp_refcount) - 1,
1207                imp->imp_obd->obd_name);
1208
1209         if (refcount_dec_and_test(&imp->imp_refcount)) {
1210                 CDEBUG(D_INFO, "final put import %p\n", imp);
1211                 obd_zombie_import_add(imp);
1212         }
1213
1214         EXIT;
1215 }
1216 EXPORT_SYMBOL(class_import_put);
1217
1218 static void init_imp_at(struct imp_at *at) {
1219         int i;
1220         at_init(&at->iat_net_latency, 0, 0);
1221         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1222                 /* max service estimates are tracked on the server side, so
1223                    don't use the AT history here, just use the last reported
1224                    val. (But keep hist for proc histogram, worst_ever) */
1225                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1226                         AT_FLG_NOHIST);
1227         }
1228 }
1229
1230 static void obd_zombie_imp_cull(struct work_struct *ws)
1231 {
1232         struct obd_import *import;
1233
1234         import = container_of(ws, struct obd_import, imp_zombie_work);
1235         obd_zombie_import_free(import);
1236 }
1237
1238 struct obd_import *class_new_import(struct obd_device *obd)
1239 {
1240         struct obd_import *imp;
1241         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1242
1243         OBD_ALLOC(imp, sizeof(*imp));
1244         if (imp == NULL)
1245                 return NULL;
1246
1247         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1248         INIT_LIST_HEAD(&imp->imp_replay_list);
1249         INIT_LIST_HEAD(&imp->imp_sending_list);
1250         INIT_LIST_HEAD(&imp->imp_delayed_list);
1251         INIT_LIST_HEAD(&imp->imp_committed_list);
1252         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1253         imp->imp_known_replied_xid = 0;
1254         imp->imp_replay_cursor = &imp->imp_committed_list;
1255         spin_lock_init(&imp->imp_lock);
1256         imp->imp_last_success_conn = 0;
1257         imp->imp_state = LUSTRE_IMP_NEW;
1258         imp->imp_obd = class_incref(obd, "import", imp);
1259         rwlock_init(&imp->imp_sec_lock);
1260         init_waitqueue_head(&imp->imp_recovery_waitq);
1261         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1262
1263         if (curr_pid_ns && curr_pid_ns->child_reaper)
1264                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1265         else
1266                 imp->imp_sec_refpid = 1;
1267
1268         refcount_set(&imp->imp_refcount, 2);
1269         atomic_set(&imp->imp_unregistering, 0);
1270         atomic_set(&imp->imp_inflight, 0);
1271         atomic_set(&imp->imp_replay_inflight, 0);
1272         atomic_set(&imp->imp_inval_count, 0);
1273         INIT_LIST_HEAD(&imp->imp_conn_list);
1274         init_imp_at(&imp->imp_at);
1275
1276         /* the default magic is V2, will be used in connect RPC, and
1277          * then adjusted according to the flags in request/reply. */
1278         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1279
1280         return imp;
1281 }
1282 EXPORT_SYMBOL(class_new_import);
1283
1284 void class_destroy_import(struct obd_import *import)
1285 {
1286         LASSERT(import != NULL);
1287         LASSERT(import != LP_POISON);
1288
1289         spin_lock(&import->imp_lock);
1290         import->imp_generation++;
1291         spin_unlock(&import->imp_lock);
1292         class_import_put(import);
1293 }
1294 EXPORT_SYMBOL(class_destroy_import);
1295
1296 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1297
1298 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1299 {
1300         spin_lock(&exp->exp_locks_list_guard);
1301
1302         LASSERT(lock->l_exp_refs_nr >= 0);
1303
1304         if (lock->l_exp_refs_target != NULL &&
1305             lock->l_exp_refs_target != exp) {
1306                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1307                               exp, lock, lock->l_exp_refs_target);
1308         }
1309         if ((lock->l_exp_refs_nr ++) == 0) {
1310                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1311                 lock->l_exp_refs_target = exp;
1312         }
1313         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1314                lock, exp, lock->l_exp_refs_nr);
1315         spin_unlock(&exp->exp_locks_list_guard);
1316 }
1317 EXPORT_SYMBOL(__class_export_add_lock_ref);
1318
1319 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1320 {
1321         spin_lock(&exp->exp_locks_list_guard);
1322         LASSERT(lock->l_exp_refs_nr > 0);
1323         if (lock->l_exp_refs_target != exp) {
1324                 LCONSOLE_WARN("lock %p, "
1325                               "mismatching export pointers: %p, %p\n",
1326                               lock, lock->l_exp_refs_target, exp);
1327         }
1328         if (-- lock->l_exp_refs_nr == 0) {
1329                 list_del_init(&lock->l_exp_refs_link);
1330                 lock->l_exp_refs_target = NULL;
1331         }
1332         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1333                lock, exp, lock->l_exp_refs_nr);
1334         spin_unlock(&exp->exp_locks_list_guard);
1335 }
1336 EXPORT_SYMBOL(__class_export_del_lock_ref);
1337 #endif
1338
1339 /* A connection defines an export context in which preallocation can
1340    be managed. This releases the export pointer reference, and returns
1341    the export handle, so the export refcount is 1 when this function
1342    returns. */
1343 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1344                   struct obd_uuid *cluuid)
1345 {
1346         struct obd_export *export;
1347         LASSERT(conn != NULL);
1348         LASSERT(obd != NULL);
1349         LASSERT(cluuid != NULL);
1350         ENTRY;
1351
1352         export = class_new_export(obd, cluuid);
1353         if (IS_ERR(export))
1354                 RETURN(PTR_ERR(export));
1355
1356         conn->cookie = export->exp_handle.h_cookie;
1357         class_export_put(export);
1358
1359         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1360                cluuid->uuid, conn->cookie);
1361         RETURN(0);
1362 }
1363 EXPORT_SYMBOL(class_connect);
1364
1365 /* if export is involved in recovery then clean up related things */
1366 static void class_export_recovery_cleanup(struct obd_export *exp)
1367 {
1368         struct obd_device *obd = exp->exp_obd;
1369
1370         spin_lock(&obd->obd_recovery_task_lock);
1371         if (obd->obd_recovering) {
1372                 if (exp->exp_in_recovery) {
1373                         spin_lock(&exp->exp_lock);
1374                         exp->exp_in_recovery = 0;
1375                         spin_unlock(&exp->exp_lock);
1376                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1377                         atomic_dec(&obd->obd_connected_clients);
1378                 }
1379
1380                 /* if called during recovery then should update
1381                  * obd_stale_clients counter,
1382                  * lightweight exports are not counted */
1383                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1384                         exp->exp_obd->obd_stale_clients++;
1385         }
1386         spin_unlock(&obd->obd_recovery_task_lock);
1387
1388         spin_lock(&exp->exp_lock);
1389         /** Cleanup req replay fields */
1390         if (exp->exp_req_replay_needed) {
1391                 exp->exp_req_replay_needed = 0;
1392
1393                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1394                 atomic_dec(&obd->obd_req_replay_clients);
1395         }
1396
1397         /** Cleanup lock replay data */
1398         if (exp->exp_lock_replay_needed) {
1399                 exp->exp_lock_replay_needed = 0;
1400
1401                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1402                 atomic_dec(&obd->obd_lock_replay_clients);
1403         }
1404         spin_unlock(&exp->exp_lock);
1405 }
1406
1407 /* This function removes 1-3 references from the export:
1408  * 1 - for export pointer passed
1409  * and if disconnect really need
1410  * 2 - removing from hash
1411  * 3 - in client_unlink_export
1412  * The export pointer passed to this function can destroyed */
1413 int class_disconnect(struct obd_export *export)
1414 {
1415         int already_disconnected;
1416         ENTRY;
1417
1418         if (export == NULL) {
1419                 CWARN("attempting to free NULL export %p\n", export);
1420                 RETURN(-EINVAL);
1421         }
1422
1423         spin_lock(&export->exp_lock);
1424         already_disconnected = export->exp_disconnected;
1425         export->exp_disconnected = 1;
1426         /*  We hold references of export for uuid hash
1427          *  and nid_hash and export link at least. So
1428          *  it is safe to call cfs_hash_del in there.  */
1429         if (!hlist_unhashed(&export->exp_nid_hash))
1430                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1431                              &export->exp_connection->c_peer.nid,
1432                              &export->exp_nid_hash);
1433         spin_unlock(&export->exp_lock);
1434
1435         /* class_cleanup(), abort_recovery(), and class_fail_export()
1436          * all end up in here, and if any of them race we shouldn't
1437          * call extra class_export_puts(). */
1438         if (already_disconnected) {
1439                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1440                 GOTO(no_disconn, already_disconnected);
1441         }
1442
1443         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1444                export->exp_handle.h_cookie);
1445
1446         class_export_recovery_cleanup(export);
1447         class_unlink_export(export);
1448 no_disconn:
1449         class_export_put(export);
1450         RETURN(0);
1451 }
1452 EXPORT_SYMBOL(class_disconnect);
1453
1454 /* Return non-zero for a fully connected export */
1455 int class_connected_export(struct obd_export *exp)
1456 {
1457         int connected = 0;
1458
1459         if (exp) {
1460                 spin_lock(&exp->exp_lock);
1461                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1462                 spin_unlock(&exp->exp_lock);
1463         }
1464         return connected;
1465 }
1466 EXPORT_SYMBOL(class_connected_export);
1467
1468 static void class_disconnect_export_list(struct list_head *list,
1469                                          enum obd_option flags)
1470 {
1471         int rc;
1472         struct obd_export *exp;
1473         ENTRY;
1474
1475         /* It's possible that an export may disconnect itself, but
1476          * nothing else will be added to this list. */
1477         while (!list_empty(list)) {
1478                 exp = list_first_entry(list, struct obd_export,
1479                                        exp_obd_chain);
1480                 /* need for safe call CDEBUG after obd_disconnect */
1481                 class_export_get(exp);
1482
1483                 spin_lock(&exp->exp_lock);
1484                 exp->exp_flags = flags;
1485                 spin_unlock(&exp->exp_lock);
1486
1487                 if (obd_uuid_equals(&exp->exp_client_uuid,
1488                                     &exp->exp_obd->obd_uuid)) {
1489                         CDEBUG(D_HA,
1490                                "exp %p export uuid == obd uuid, don't discon\n",
1491                                exp);
1492                         /* Need to delete this now so we don't end up pointing
1493                          * to work_list later when this export is cleaned up. */
1494                         list_del_init(&exp->exp_obd_chain);
1495                         class_export_put(exp);
1496                         continue;
1497                 }
1498
1499                 class_export_get(exp);
1500                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1501                        "last request at %lld\n",
1502                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1503                        exp, exp->exp_last_request_time);
1504                 /* release one export reference anyway */
1505                 rc = obd_disconnect(exp);
1506
1507                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1508                        obd_export_nid2str(exp), exp, rc);
1509                 class_export_put(exp);
1510         }
1511         EXIT;
1512 }
1513
1514 void class_disconnect_exports(struct obd_device *obd)
1515 {
1516         LIST_HEAD(work_list);
1517         ENTRY;
1518
1519         /* Move all of the exports from obd_exports to a work list, en masse. */
1520         spin_lock(&obd->obd_dev_lock);
1521         list_splice_init(&obd->obd_exports, &work_list);
1522         list_splice_init(&obd->obd_delayed_exports, &work_list);
1523         spin_unlock(&obd->obd_dev_lock);
1524
1525         if (!list_empty(&work_list)) {
1526                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1527                        "disconnecting them\n", obd->obd_minor, obd);
1528                 class_disconnect_export_list(&work_list,
1529                                              exp_flags_from_obd(obd));
1530         } else
1531                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1532                        obd->obd_minor, obd);
1533         EXIT;
1534 }
1535 EXPORT_SYMBOL(class_disconnect_exports);
1536
1537 /* Remove exports that have not completed recovery.
1538  */
1539 void class_disconnect_stale_exports(struct obd_device *obd,
1540                                     int (*test_export)(struct obd_export *))
1541 {
1542         LIST_HEAD(work_list);
1543         struct obd_export *exp, *n;
1544         int evicted = 0;
1545         ENTRY;
1546
1547         spin_lock(&obd->obd_dev_lock);
1548         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1549                                  exp_obd_chain) {
1550                 /* don't count self-export as client */
1551                 if (obd_uuid_equals(&exp->exp_client_uuid,
1552                                     &exp->exp_obd->obd_uuid))
1553                         continue;
1554
1555                 /* don't evict clients which have no slot in last_rcvd
1556                  * (e.g. lightweight connection) */
1557                 if (exp->exp_target_data.ted_lr_idx == -1)
1558                         continue;
1559
1560                 spin_lock(&exp->exp_lock);
1561                 if (exp->exp_failed || test_export(exp)) {
1562                         spin_unlock(&exp->exp_lock);
1563                         continue;
1564                 }
1565                 exp->exp_failed = 1;
1566                 spin_unlock(&exp->exp_lock);
1567
1568                 list_move(&exp->exp_obd_chain, &work_list);
1569                 evicted++;
1570                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1571                        obd->obd_name, exp->exp_client_uuid.uuid,
1572                        obd_export_nid2str(exp));
1573                 print_export_data(exp, "EVICTING", 0, D_HA);
1574         }
1575         spin_unlock(&obd->obd_dev_lock);
1576
1577         if (evicted)
1578                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1579                               obd->obd_name, evicted);
1580
1581         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1582                                                  OBD_OPT_ABORT_RECOV);
1583         EXIT;
1584 }
1585 EXPORT_SYMBOL(class_disconnect_stale_exports);
1586
1587 void class_fail_export(struct obd_export *exp)
1588 {
1589         int rc, already_failed;
1590
1591         spin_lock(&exp->exp_lock);
1592         already_failed = exp->exp_failed;
1593         exp->exp_failed = 1;
1594         spin_unlock(&exp->exp_lock);
1595
1596         if (already_failed) {
1597                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1598                        exp, exp->exp_client_uuid.uuid);
1599                 return;
1600         }
1601
1602         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1603                exp, exp->exp_client_uuid.uuid);
1604
1605         if (obd_dump_on_timeout)
1606                 libcfs_debug_dumplog();
1607
1608         /* need for safe call CDEBUG after obd_disconnect */
1609         class_export_get(exp);
1610
1611         /* Most callers into obd_disconnect are removing their own reference
1612          * (request, for example) in addition to the one from the hash table.
1613          * We don't have such a reference here, so make one. */
1614         class_export_get(exp);
1615         rc = obd_disconnect(exp);
1616         if (rc)
1617                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1618         else
1619                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1620                        exp, exp->exp_client_uuid.uuid);
1621         class_export_put(exp);
1622 }
1623 EXPORT_SYMBOL(class_fail_export);
1624
1625 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1626 {
1627         struct cfs_hash *nid_hash;
1628         struct obd_export *doomed_exp = NULL;
1629         int exports_evicted = 0;
1630
1631         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1632
1633         spin_lock(&obd->obd_dev_lock);
1634         /* umount has run already, so evict thread should leave
1635          * its task to umount thread now */
1636         if (obd->obd_stopping) {
1637                 spin_unlock(&obd->obd_dev_lock);
1638                 return exports_evicted;
1639         }
1640         nid_hash = obd->obd_nid_hash;
1641         cfs_hash_getref(nid_hash);
1642         spin_unlock(&obd->obd_dev_lock);
1643
1644         do {
1645                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1646                 if (doomed_exp == NULL)
1647                         break;
1648
1649                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1650                          "nid %s found, wanted nid %s, requested nid %s\n",
1651                          obd_export_nid2str(doomed_exp),
1652                          libcfs_nid2str(nid_key), nid);
1653                 LASSERTF(doomed_exp != obd->obd_self_export,
1654                          "self-export is hashed by NID?\n");
1655                 exports_evicted++;
1656                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1657                               "request\n", obd->obd_name,
1658                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1659                               obd_export_nid2str(doomed_exp));
1660                 class_fail_export(doomed_exp);
1661                 class_export_put(doomed_exp);
1662         } while (1);
1663
1664         cfs_hash_putref(nid_hash);
1665
1666         if (!exports_evicted)
1667                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1668                        obd->obd_name, nid);
1669         return exports_evicted;
1670 }
1671 EXPORT_SYMBOL(obd_export_evict_by_nid);
1672
1673 #ifdef HAVE_SERVER_SUPPORT
1674 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1675 {
1676         struct obd_export *doomed_exp = NULL;
1677         struct obd_uuid doomed_uuid;
1678         int exports_evicted = 0;
1679
1680         spin_lock(&obd->obd_dev_lock);
1681         if (obd->obd_stopping) {
1682                 spin_unlock(&obd->obd_dev_lock);
1683                 return exports_evicted;
1684         }
1685         spin_unlock(&obd->obd_dev_lock);
1686
1687         obd_str2uuid(&doomed_uuid, uuid);
1688         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1689                 CERROR("%s: can't evict myself\n", obd->obd_name);
1690                 return exports_evicted;
1691         }
1692
1693         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1694         if (doomed_exp == NULL) {
1695                 CERROR("%s: can't disconnect %s: no exports found\n",
1696                        obd->obd_name, uuid);
1697         } else {
1698                 CWARN("%s: evicting %s at adminstrative request\n",
1699                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1700                 class_fail_export(doomed_exp);
1701                 class_export_put(doomed_exp);
1702                 obd_uuid_del(obd, doomed_exp);
1703                 exports_evicted++;
1704         }
1705
1706         return exports_evicted;
1707 }
1708 #endif /* HAVE_SERVER_SUPPORT */
1709
1710 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1711 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1712 EXPORT_SYMBOL(class_export_dump_hook);
1713 #endif
1714
1715 static void print_export_data(struct obd_export *exp, const char *status,
1716                               int locks, int debug_level)
1717 {
1718         struct ptlrpc_reply_state *rs;
1719         struct ptlrpc_reply_state *first_reply = NULL;
1720         int nreplies = 0;
1721
1722         spin_lock(&exp->exp_lock);
1723         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1724                             rs_exp_list) {
1725                 if (nreplies == 0)
1726                         first_reply = rs;
1727                 nreplies++;
1728         }
1729         spin_unlock(&exp->exp_lock);
1730
1731         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1732                "%p %s %llu stale:%d\n",
1733                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1734                obd_export_nid2str(exp),
1735                refcount_read(&exp->exp_handle.h_ref),
1736                atomic_read(&exp->exp_rpc_count),
1737                atomic_read(&exp->exp_cb_count),
1738                atomic_read(&exp->exp_locks_count),
1739                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1740                nreplies, first_reply, nreplies > 3 ? "..." : "",
1741                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1742 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1743         if (locks && class_export_dump_hook != NULL)
1744                 class_export_dump_hook(exp);
1745 #endif
1746 }
1747
1748 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1749 {
1750         struct obd_export *exp;
1751
1752         spin_lock(&obd->obd_dev_lock);
1753         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1754                 print_export_data(exp, "ACTIVE", locks, debug_level);
1755         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1756                 print_export_data(exp, "UNLINKED", locks, debug_level);
1757         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1758                 print_export_data(exp, "DELAYED", locks, debug_level);
1759         spin_unlock(&obd->obd_dev_lock);
1760 }
1761
1762 void obd_exports_barrier(struct obd_device *obd)
1763 {
1764         int waited = 2;
1765         LASSERT(list_empty(&obd->obd_exports));
1766         spin_lock(&obd->obd_dev_lock);
1767         while (!list_empty(&obd->obd_unlinked_exports)) {
1768                 spin_unlock(&obd->obd_dev_lock);
1769                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1770                 if (waited > 5 && is_power_of_2(waited)) {
1771                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1772                                       "more than %d seconds. "
1773                                       "The obd refcount = %d. Is it stuck?\n",
1774                                       obd->obd_name, waited,
1775                                       atomic_read(&obd->obd_refcount));
1776                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1777                 }
1778                 waited *= 2;
1779                 spin_lock(&obd->obd_dev_lock);
1780         }
1781         spin_unlock(&obd->obd_dev_lock);
1782 }
1783 EXPORT_SYMBOL(obd_exports_barrier);
1784
1785 /**
1786  * Add export to the obd_zombe thread and notify it.
1787  */
1788 static void obd_zombie_export_add(struct obd_export *exp) {
1789         atomic_dec(&obd_stale_export_num);
1790         spin_lock(&exp->exp_obd->obd_dev_lock);
1791         LASSERT(!list_empty(&exp->exp_obd_chain));
1792         list_del_init(&exp->exp_obd_chain);
1793         spin_unlock(&exp->exp_obd->obd_dev_lock);
1794
1795         queue_work(zombie_wq, &exp->exp_zombie_work);
1796 }
1797
1798 /**
1799  * Add import to the obd_zombe thread and notify it.
1800  */
1801 static void obd_zombie_import_add(struct obd_import *imp) {
1802         LASSERT(imp->imp_sec == NULL);
1803
1804         queue_work(zombie_wq, &imp->imp_zombie_work);
1805 }
1806
1807 /**
1808  * wait when obd_zombie import/export queues become empty
1809  */
1810 void obd_zombie_barrier(void)
1811 {
1812         flush_workqueue(zombie_wq);
1813 }
1814 EXPORT_SYMBOL(obd_zombie_barrier);
1815
1816
1817 struct obd_export *obd_stale_export_get(void)
1818 {
1819         struct obd_export *exp = NULL;
1820         ENTRY;
1821
1822         spin_lock(&obd_stale_export_lock);
1823         if (!list_empty(&obd_stale_exports)) {
1824                 exp = list_first_entry(&obd_stale_exports,
1825                                        struct obd_export, exp_stale_list);
1826                 list_del_init(&exp->exp_stale_list);
1827         }
1828         spin_unlock(&obd_stale_export_lock);
1829
1830         if (exp) {
1831                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1832                        atomic_read(&obd_stale_export_num));
1833         }
1834         RETURN(exp);
1835 }
1836 EXPORT_SYMBOL(obd_stale_export_get);
1837
1838 void obd_stale_export_put(struct obd_export *exp)
1839 {
1840         ENTRY;
1841
1842         LASSERT(list_empty(&exp->exp_stale_list));
1843         if (exp->exp_lock_hash &&
1844             atomic_read(&exp->exp_lock_hash->hs_count)) {
1845                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1846                        atomic_read(&obd_stale_export_num));
1847
1848                 spin_lock_bh(&exp->exp_bl_list_lock);
1849                 spin_lock(&obd_stale_export_lock);
1850                 /* Add to the tail if there is no blocked locks,
1851                  * to the head otherwise. */
1852                 if (list_empty(&exp->exp_bl_list))
1853                         list_add_tail(&exp->exp_stale_list,
1854                                       &obd_stale_exports);
1855                 else
1856                         list_add(&exp->exp_stale_list,
1857                                  &obd_stale_exports);
1858
1859                 spin_unlock(&obd_stale_export_lock);
1860                 spin_unlock_bh(&exp->exp_bl_list_lock);
1861         } else {
1862                 class_export_put(exp);
1863         }
1864         EXIT;
1865 }
1866 EXPORT_SYMBOL(obd_stale_export_put);
1867
1868 /**
1869  * Adjust the position of the export in the stale list,
1870  * i.e. move to the head of the list if is needed.
1871  **/
1872 void obd_stale_export_adjust(struct obd_export *exp)
1873 {
1874         LASSERT(exp != NULL);
1875         spin_lock_bh(&exp->exp_bl_list_lock);
1876         spin_lock(&obd_stale_export_lock);
1877
1878         if (!list_empty(&exp->exp_stale_list) &&
1879             !list_empty(&exp->exp_bl_list))
1880                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1881
1882         spin_unlock(&obd_stale_export_lock);
1883         spin_unlock_bh(&exp->exp_bl_list_lock);
1884 }
1885 EXPORT_SYMBOL(obd_stale_export_adjust);
1886
1887 /**
1888  * start destroy zombie import/export thread
1889  */
1890 int obd_zombie_impexp_init(void)
1891 {
1892         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1893         if (!zombie_wq)
1894                 return -ENOMEM;
1895
1896         return 0;
1897 }
1898
1899 /**
1900  * stop destroy zombie import/export thread
1901  */
1902 void obd_zombie_impexp_stop(void)
1903 {
1904         destroy_workqueue(zombie_wq);
1905         LASSERT(list_empty(&obd_stale_exports));
1906 }
1907
1908 /***** Kernel-userspace comm helpers *******/
1909
1910 /* Get length of entire message, including header */
1911 int kuc_len(int payload_len)
1912 {
1913         return sizeof(struct kuc_hdr) + payload_len;
1914 }
1915 EXPORT_SYMBOL(kuc_len);
1916
1917 /* Get a pointer to kuc header, given a ptr to the payload
1918  * @param p Pointer to payload area
1919  * @returns Pointer to kuc header
1920  */
1921 struct kuc_hdr * kuc_ptr(void *p)
1922 {
1923         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1924         LASSERT(lh->kuc_magic == KUC_MAGIC);
1925         return lh;
1926 }
1927 EXPORT_SYMBOL(kuc_ptr);
1928
1929 /* Alloc space for a message, and fill in header
1930  * @return Pointer to payload area
1931  */
1932 void *kuc_alloc(int payload_len, int transport, int type)
1933 {
1934         struct kuc_hdr *lh;
1935         int len = kuc_len(payload_len);
1936
1937         OBD_ALLOC(lh, len);
1938         if (lh == NULL)
1939                 return ERR_PTR(-ENOMEM);
1940
1941         lh->kuc_magic = KUC_MAGIC;
1942         lh->kuc_transport = transport;
1943         lh->kuc_msgtype = type;
1944         lh->kuc_msglen = len;
1945
1946         return (void *)(lh + 1);
1947 }
1948 EXPORT_SYMBOL(kuc_alloc);
1949
1950 /* Takes pointer to payload area */
1951 void kuc_free(void *p, int payload_len)
1952 {
1953         struct kuc_hdr *lh = kuc_ptr(p);
1954         OBD_FREE(lh, kuc_len(payload_len));
1955 }
1956 EXPORT_SYMBOL(kuc_free);
1957
1958 struct obd_request_slot_waiter {
1959         struct list_head        orsw_entry;
1960         wait_queue_head_t       orsw_waitq;
1961         bool                    orsw_signaled;
1962 };
1963
1964 static bool obd_request_slot_avail(struct client_obd *cli,
1965                                    struct obd_request_slot_waiter *orsw)
1966 {
1967         bool avail;
1968
1969         spin_lock(&cli->cl_loi_list_lock);
1970         avail = !!list_empty(&orsw->orsw_entry);
1971         spin_unlock(&cli->cl_loi_list_lock);
1972
1973         return avail;
1974 };
1975
1976 /*
1977  * For network flow control, the RPC sponsor needs to acquire a credit
1978  * before sending the RPC. The credits count for a connection is defined
1979  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1980  * the subsequent RPC sponsors need to wait until others released their
1981  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1982  */
1983 int obd_get_request_slot(struct client_obd *cli)
1984 {
1985         struct obd_request_slot_waiter   orsw;
1986         int                              rc;
1987
1988         spin_lock(&cli->cl_loi_list_lock);
1989         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1990                 cli->cl_rpcs_in_flight++;
1991                 spin_unlock(&cli->cl_loi_list_lock);
1992                 return 0;
1993         }
1994
1995         init_waitqueue_head(&orsw.orsw_waitq);
1996         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1997         orsw.orsw_signaled = false;
1998         spin_unlock(&cli->cl_loi_list_lock);
1999
2000         rc = l_wait_event_abortable(orsw.orsw_waitq,
2001                                     obd_request_slot_avail(cli, &orsw) ||
2002                                     orsw.orsw_signaled);
2003
2004         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2005          * freed but other (such as obd_put_request_slot) is using it. */
2006         spin_lock(&cli->cl_loi_list_lock);
2007         if (rc != 0) {
2008                 if (!orsw.orsw_signaled) {
2009                         if (list_empty(&orsw.orsw_entry))
2010                                 cli->cl_rpcs_in_flight--;
2011                         else
2012                                 list_del(&orsw.orsw_entry);
2013                 }
2014                 rc = -EINTR;
2015         }
2016
2017         if (orsw.orsw_signaled) {
2018                 LASSERT(list_empty(&orsw.orsw_entry));
2019
2020                 rc = -EINTR;
2021         }
2022         spin_unlock(&cli->cl_loi_list_lock);
2023
2024         return rc;
2025 }
2026 EXPORT_SYMBOL(obd_get_request_slot);
2027
2028 void obd_put_request_slot(struct client_obd *cli)
2029 {
2030         struct obd_request_slot_waiter *orsw;
2031
2032         spin_lock(&cli->cl_loi_list_lock);
2033         cli->cl_rpcs_in_flight--;
2034
2035         /* If there is free slot, wakeup the first waiter. */
2036         if (!list_empty(&cli->cl_flight_waiters) &&
2037             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2038                 orsw = list_first_entry(&cli->cl_flight_waiters,
2039                                         struct obd_request_slot_waiter,
2040                                         orsw_entry);
2041                 list_del_init(&orsw->orsw_entry);
2042                 cli->cl_rpcs_in_flight++;
2043                 wake_up(&orsw->orsw_waitq);
2044         }
2045         spin_unlock(&cli->cl_loi_list_lock);
2046 }
2047 EXPORT_SYMBOL(obd_put_request_slot);
2048
2049 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2050 {
2051         return cli->cl_max_rpcs_in_flight;
2052 }
2053 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2054
2055 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2056 {
2057         struct obd_request_slot_waiter *orsw;
2058         __u32                           old;
2059         int                             diff;
2060         int                             i;
2061         const char *type_name;
2062         int                             rc;
2063
2064         if (max > OBD_MAX_RIF_MAX || max < 1)
2065                 return -ERANGE;
2066
2067         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2068         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2069                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2070                  * strictly lower that max_rpcs_in_flight */
2071                 if (max < 2) {
2072                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2073                                "because it must be higher than "
2074                                "max_mod_rpcs_in_flight value",
2075                                cli->cl_import->imp_obd->obd_name);
2076                         return -ERANGE;
2077                 }
2078                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2079                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2080                         if (rc != 0)
2081                                 return rc;
2082                 }
2083         }
2084
2085         spin_lock(&cli->cl_loi_list_lock);
2086         old = cli->cl_max_rpcs_in_flight;
2087         cli->cl_max_rpcs_in_flight = max;
2088         client_adjust_max_dirty(cli);
2089
2090         diff = max - old;
2091
2092         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2093         for (i = 0; i < diff; i++) {
2094                 if (list_empty(&cli->cl_flight_waiters))
2095                         break;
2096
2097                 orsw = list_first_entry(&cli->cl_flight_waiters,
2098                                         struct obd_request_slot_waiter,
2099                                         orsw_entry);
2100                 list_del_init(&orsw->orsw_entry);
2101                 cli->cl_rpcs_in_flight++;
2102                 wake_up(&orsw->orsw_waitq);
2103         }
2104         spin_unlock(&cli->cl_loi_list_lock);
2105
2106         return 0;
2107 }
2108 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2109
2110 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2111 {
2112         return cli->cl_max_mod_rpcs_in_flight;
2113 }
2114 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2115
2116 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2117 {
2118         struct obd_connect_data *ocd;
2119         __u16 maxmodrpcs;
2120         __u16 prev;
2121
2122         if (max > OBD_MAX_RIF_MAX || max < 1)
2123                 return -ERANGE;
2124
2125         /* cannot exceed or equal max_rpcs_in_flight */
2126         if (max >= cli->cl_max_rpcs_in_flight) {
2127                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2128                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2129                        cli->cl_import->imp_obd->obd_name,
2130                        max, cli->cl_max_rpcs_in_flight);
2131                 return -ERANGE;
2132         }
2133
2134         /* cannot exceed max modify RPCs in flight supported by the server */
2135         ocd = &cli->cl_import->imp_connect_data;
2136         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2137                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2138         else
2139                 maxmodrpcs = 1;
2140         if (max > maxmodrpcs) {
2141                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2142                        "higher than max_mod_rpcs_per_client value (%hu) "
2143                        "returned by the server at connection\n",
2144                        cli->cl_import->imp_obd->obd_name,
2145                        max, maxmodrpcs);
2146                 return -ERANGE;
2147         }
2148
2149         spin_lock(&cli->cl_mod_rpcs_lock);
2150
2151         prev = cli->cl_max_mod_rpcs_in_flight;
2152         cli->cl_max_mod_rpcs_in_flight = max;
2153
2154         /* wakeup waiters if limit has been increased */
2155         if (cli->cl_max_mod_rpcs_in_flight > prev)
2156                 wake_up(&cli->cl_mod_rpcs_waitq);
2157
2158         spin_unlock(&cli->cl_mod_rpcs_lock);
2159
2160         return 0;
2161 }
2162 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2163
2164 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2165                                struct seq_file *seq)
2166 {
2167         unsigned long mod_tot = 0, mod_cum;
2168         struct timespec64 now;
2169         int i;
2170
2171         ktime_get_real_ts64(&now);
2172
2173         spin_lock(&cli->cl_mod_rpcs_lock);
2174
2175         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2176                    (s64)now.tv_sec, now.tv_nsec);
2177         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2178                    cli->cl_mod_rpcs_in_flight);
2179
2180         seq_printf(seq, "\n\t\t\tmodify\n");
2181         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2182
2183         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2184
2185         mod_cum = 0;
2186         for (i = 0; i < OBD_HIST_MAX; i++) {
2187                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2188                 mod_cum += mod;
2189                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2190                            i, mod, pct(mod, mod_tot),
2191                            pct(mod_cum, mod_tot));
2192                 if (mod_cum == mod_tot)
2193                         break;
2194         }
2195
2196         spin_unlock(&cli->cl_mod_rpcs_lock);
2197
2198         return 0;
2199 }
2200 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2201
2202 /* The number of modify RPCs sent in parallel is limited
2203  * because the server has a finite number of slots per client to
2204  * store request result and ensure reply reconstruction when needed.
2205  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2206  * that takes into account server limit and cl_max_rpcs_in_flight
2207  * value.
2208  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2209  * one close request is allowed above the maximum.
2210  */
2211 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2212                                                  bool close_req)
2213 {
2214         bool avail;
2215
2216         /* A slot is available if
2217          * - number of modify RPCs in flight is less than the max
2218          * - it's a close RPC and no other close request is in flight
2219          */
2220         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2221                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2222
2223         return avail;
2224 }
2225
2226 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2227                                          bool close_req)
2228 {
2229         bool avail;
2230
2231         spin_lock(&cli->cl_mod_rpcs_lock);
2232         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2233         spin_unlock(&cli->cl_mod_rpcs_lock);
2234         return avail;
2235 }
2236
2237
2238 /* Get a modify RPC slot from the obd client @cli according
2239  * to the kind of operation @opc that is going to be sent
2240  * and the intent @it of the operation if it applies.
2241  * If the maximum number of modify RPCs in flight is reached
2242  * the thread is put to sleep.
2243  * Returns the tag to be set in the request message. Tag 0
2244  * is reserved for non-modifying requests.
2245  */
2246 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2247 {
2248         bool                    close_req = false;
2249         __u16                   i, max;
2250
2251         if (opc == MDS_CLOSE)
2252                 close_req = true;
2253
2254         do {
2255                 spin_lock(&cli->cl_mod_rpcs_lock);
2256                 max = cli->cl_max_mod_rpcs_in_flight;
2257                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2258                         /* there is a slot available */
2259                         cli->cl_mod_rpcs_in_flight++;
2260                         if (close_req)
2261                                 cli->cl_close_rpcs_in_flight++;
2262                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2263                                          cli->cl_mod_rpcs_in_flight);
2264                         /* find a free tag */
2265                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2266                                                 max + 1);
2267                         LASSERT(i < OBD_MAX_RIF_MAX);
2268                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2269                         spin_unlock(&cli->cl_mod_rpcs_lock);
2270                         /* tag 0 is reserved for non-modify RPCs */
2271
2272                         CDEBUG(D_RPCTRACE,
2273                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2274                                cli->cl_import->imp_obd->obd_name,
2275                                i + 1, opc, max);
2276
2277                         return i + 1;
2278                 }
2279                 spin_unlock(&cli->cl_mod_rpcs_lock);
2280
2281                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2282                        "opc %u, max %hu\n",
2283                        cli->cl_import->imp_obd->obd_name, opc, max);
2284
2285                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2286                                           obd_mod_rpc_slot_avail(cli,
2287                                                                  close_req));
2288         } while (true);
2289 }
2290 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2291
2292 /* Put a modify RPC slot from the obd client @cli according
2293  * to the kind of operation @opc that has been sent.
2294  */
2295 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2296 {
2297         bool                    close_req = false;
2298
2299         if (tag == 0)
2300                 return;
2301
2302         if (opc == MDS_CLOSE)
2303                 close_req = true;
2304
2305         spin_lock(&cli->cl_mod_rpcs_lock);
2306         cli->cl_mod_rpcs_in_flight--;
2307         if (close_req)
2308                 cli->cl_close_rpcs_in_flight--;
2309         /* release the tag in the bitmap */
2310         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2311         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2312         spin_unlock(&cli->cl_mod_rpcs_lock);
2313         wake_up(&cli->cl_mod_rpcs_waitq);
2314 }
2315 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2316