Whamcloud - gitweb
37b6bf246f456ca28a5d6280ceac77e1f0631d70
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
50
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
54
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58                               const char *status, int locks, int debug_level);
59
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
63
64 /*
65  * support functions: we could use inter-module communication, but this
66  * is more portable to other OS's
67  */
68 static struct obd_device *obd_device_alloc(void)
69 {
70         struct obd_device *obd;
71
72         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
73         if (obd != NULL) {
74                 obd->obd_magic = OBD_DEVICE_MAGIC;
75         }
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct kobject *kobj = kset_find_obj(lustre_kset, name);
96
97         if (kobj && kobj->ktype == &class_ktype)
98                 return container_of(kobj, struct obd_type, typ_kobj);
99
100         kobject_put(kobj);
101         return NULL;
102 }
103 EXPORT_SYMBOL(class_search_type);
104
105 struct obd_type *class_get_type(const char *name)
106 {
107         struct obd_type *type;
108
109         type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
111         if (!type) {
112                 const char *modname = name;
113
114 #ifdef HAVE_SERVER_SUPPORT
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
124
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 if (try_module_get(type->typ_dt_ops->o_owner)) {
136                         atomic_inc(&type->typ_refcnt);
137                         /* class_search_type() returned a counted reference,
138                          * but we don't need that count any more as
139                          * we have one through typ_refcnt.
140                          */
141                         kobject_put(&type->typ_kobj);
142                 } else {
143                         kobject_put(&type->typ_kobj);
144                         type = NULL;
145                 }
146         }
147         return type;
148 }
149
150 void class_put_type(struct obd_type *type)
151 {
152         LASSERT(type);
153         module_put(type->typ_dt_ops->o_owner);
154         atomic_dec(&type->typ_refcnt);
155 }
156
157 static void class_sysfs_release(struct kobject *kobj)
158 {
159         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
160
161         debugfs_remove_recursive(type->typ_debugfs_entry);
162         type->typ_debugfs_entry = NULL;
163
164         if (type->typ_lu)
165                 lu_device_type_fini(type->typ_lu);
166
167 #ifdef CONFIG_PROC_FS
168         if (type->typ_name && type->typ_procroot)
169                 remove_proc_subtree(type->typ_name, proc_lustre_root);
170 #endif
171         OBD_FREE(type, sizeof(*type));
172 }
173
174 static struct kobj_type class_ktype = {
175         .sysfs_ops      = &lustre_sysfs_ops,
176         .release        = class_sysfs_release,
177 };
178
179 #ifdef HAVE_SERVER_SUPPORT
180 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
181 {
182         struct dentry *symlink;
183         struct obd_type *type;
184         int rc;
185
186         type = class_search_type(name);
187         if (type) {
188                 kobject_put(&type->typ_kobj);
189                 return ERR_PTR(-EEXIST);
190         }
191
192         OBD_ALLOC(type, sizeof(*type));
193         if (!type)
194                 return ERR_PTR(-ENOMEM);
195
196         type->typ_kobj.kset = lustre_kset;
197         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
198                                   &lustre_kset->kobj, "%s", name);
199         if (rc)
200                 return ERR_PTR(rc);
201
202         symlink = debugfs_create_dir(name, debugfs_lustre_root);
203         type->typ_debugfs_entry = symlink;
204         type->typ_sym_filter = true;
205
206         if (enable_proc) {
207                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
208                                                       NULL, NULL);
209                 if (IS_ERR(type->typ_procroot)) {
210                         CERROR("%s: can't create compat proc entry: %d\n",
211                                name, (int)PTR_ERR(type->typ_procroot));
212                         type->typ_procroot = NULL;
213                 }
214         }
215
216         return type;
217 }
218 EXPORT_SYMBOL(class_add_symlinks);
219 #endif /* HAVE_SERVER_SUPPORT */
220
221 #define CLASS_MAX_NAME 1024
222
223 int class_register_type(const struct obd_ops *dt_ops,
224                         const struct md_ops *md_ops,
225                         bool enable_proc,
226                         const char *name, struct lu_device_type *ldt)
227 {
228         struct obd_type *type;
229         int rc;
230
231         ENTRY;
232         /* sanity check */
233         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
234
235         type = class_search_type(name);
236         if (type) {
237 #ifdef HAVE_SERVER_SUPPORT
238                 if (type->typ_sym_filter)
239                         goto dir_exist;
240 #endif /* HAVE_SERVER_SUPPORT */
241                 kobject_put(&type->typ_kobj);
242                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
243                 RETURN(-EEXIST);
244         }
245
246         OBD_ALLOC(type, sizeof(*type));
247         if (type == NULL)
248                 RETURN(-ENOMEM);
249
250         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
251         type->typ_kobj.kset = lustre_kset;
252         kobject_init(&type->typ_kobj, &class_ktype);
253 #ifdef HAVE_SERVER_SUPPORT
254 dir_exist:
255 #endif /* HAVE_SERVER_SUPPORT */
256
257         type->typ_dt_ops = dt_ops;
258         type->typ_md_ops = md_ops;
259
260 #ifdef HAVE_SERVER_SUPPORT
261         if (type->typ_sym_filter) {
262                 type->typ_sym_filter = false;
263                 kobject_put(&type->typ_kobj);
264                 goto setup_ldt;
265         }
266 #endif
267 #ifdef CONFIG_PROC_FS
268         if (enable_proc && !type->typ_procroot) {
269                 type->typ_procroot = lprocfs_register(name,
270                                                       proc_lustre_root,
271                                                       NULL, type);
272                 if (IS_ERR(type->typ_procroot)) {
273                         rc = PTR_ERR(type->typ_procroot);
274                         type->typ_procroot = NULL;
275                         GOTO(failed, rc);
276                 }
277         }
278 #endif
279         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
280
281         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
282         if (rc)
283                 GOTO(failed, rc);
284 #ifdef HAVE_SERVER_SUPPORT
285 setup_ldt:
286 #endif
287         if (ldt) {
288                 rc = lu_device_type_init(ldt);
289                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
290                 wake_up_var(&type->typ_lu);
291                 if (rc)
292                         GOTO(failed, rc);
293         }
294
295         RETURN(0);
296
297 failed:
298         kobject_put(&type->typ_kobj);
299
300         RETURN(rc);
301 }
302 EXPORT_SYMBOL(class_register_type);
303
304 int class_unregister_type(const char *name)
305 {
306         struct obd_type *type = class_search_type(name);
307         int rc = 0;
308         ENTRY;
309
310         if (!type) {
311                 CERROR("unknown obd type\n");
312                 RETURN(-EINVAL);
313         }
314
315         if (atomic_read(&type->typ_refcnt)) {
316                 CERROR("type %s has refcount (%d)\n", name,
317                        atomic_read(&type->typ_refcnt));
318                 /* This is a bad situation, let's make the best of it */
319                 /* Remove ops, but leave the name for debugging */
320                 type->typ_dt_ops = NULL;
321                 type->typ_md_ops = NULL;
322                 GOTO(out_put, rc = -EBUSY);
323         }
324
325         /* Put the final ref */
326         kobject_put(&type->typ_kobj);
327 out_put:
328         /* Put the ref returned by class_search_type() */
329         kobject_put(&type->typ_kobj);
330
331         RETURN(rc);
332 } /* class_unregister_type */
333 EXPORT_SYMBOL(class_unregister_type);
334
335 /**
336  * Create a new obd device.
337  *
338  * Allocate the new obd_device and initialize it.
339  *
340  * \param[in] type_name obd device type string.
341  * \param[in] name      obd device name.
342  * \param[in] uuid      obd device UUID
343  *
344  * \retval newdev         pointer to created obd_device
345  * \retval ERR_PTR(errno) on error
346  */
347 struct obd_device *class_newdev(const char *type_name, const char *name,
348                                 const char *uuid)
349 {
350         struct obd_device *newdev;
351         struct obd_type *type = NULL;
352         ENTRY;
353
354         if (strlen(name) >= MAX_OBD_NAME) {
355                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
356                 RETURN(ERR_PTR(-EINVAL));
357         }
358
359         type = class_get_type(type_name);
360         if (type == NULL){
361                 CERROR("OBD: unknown type: %s\n", type_name);
362                 RETURN(ERR_PTR(-ENODEV));
363         }
364
365         newdev = obd_device_alloc();
366         if (newdev == NULL) {
367                 class_put_type(type);
368                 RETURN(ERR_PTR(-ENOMEM));
369         }
370         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
371         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
372         newdev->obd_type = type;
373         newdev->obd_minor = -1;
374
375         rwlock_init(&newdev->obd_pool_lock);
376         newdev->obd_pool_limit = 0;
377         newdev->obd_pool_slv = 0;
378
379         INIT_LIST_HEAD(&newdev->obd_exports);
380         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
381         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
382         INIT_LIST_HEAD(&newdev->obd_exports_timed);
383         INIT_LIST_HEAD(&newdev->obd_nid_stats);
384         spin_lock_init(&newdev->obd_nid_lock);
385         spin_lock_init(&newdev->obd_dev_lock);
386         mutex_init(&newdev->obd_dev_mutex);
387         spin_lock_init(&newdev->obd_osfs_lock);
388         /* newdev->obd_osfs_age must be set to a value in the distant
389          * past to guarantee a fresh statfs is fetched on mount. */
390         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
391
392         /* XXX belongs in setup not attach  */
393         init_rwsem(&newdev->obd_observer_link_sem);
394         /* recovery data */
395         spin_lock_init(&newdev->obd_recovery_task_lock);
396         init_waitqueue_head(&newdev->obd_next_transno_waitq);
397         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
398         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
399         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
400         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
401         INIT_LIST_HEAD(&newdev->obd_evict_list);
402         INIT_LIST_HEAD(&newdev->obd_lwp_list);
403
404         llog_group_init(&newdev->obd_olg);
405         /* Detach drops this */
406         atomic_set(&newdev->obd_refcount, 1);
407         lu_ref_init(&newdev->obd_reference);
408         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
409
410         newdev->obd_conn_inprogress = 0;
411
412         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
413
414         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
415                newdev->obd_name, newdev);
416
417         return newdev;
418 }
419
420 /**
421  * Free obd device.
422  *
423  * \param[in] obd obd_device to be freed
424  *
425  * \retval none
426  */
427 void class_free_dev(struct obd_device *obd)
428 {
429         struct obd_type *obd_type = obd->obd_type;
430
431         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
432                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
433         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
434                  "obd %p != obd_devs[%d] %p\n",
435                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
436         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
437                  "obd_refcount should be 0, not %d\n",
438                  atomic_read(&obd->obd_refcount));
439         LASSERT(obd_type != NULL);
440
441         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
442                obd->obd_name, obd->obd_type->typ_name);
443
444         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
445                          obd->obd_name, obd->obd_uuid.uuid);
446         if (obd->obd_stopping) {
447                 int err;
448
449                 /* If we're not stopping, we were never set up */
450                 err = obd_cleanup(obd);
451                 if (err)
452                         CERROR("Cleanup %s returned %d\n",
453                                 obd->obd_name, err);
454         }
455
456         obd_device_free(obd);
457
458         class_put_type(obd_type);
459 }
460
461 /**
462  * Unregister obd device.
463  *
464  * Free slot in obd_dev[] used by \a obd.
465  *
466  * \param[in] new_obd obd_device to be unregistered
467  *
468  * \retval none
469  */
470 void class_unregister_device(struct obd_device *obd)
471 {
472         write_lock(&obd_dev_lock);
473         if (obd->obd_minor >= 0) {
474                 LASSERT(obd_devs[obd->obd_minor] == obd);
475                 obd_devs[obd->obd_minor] = NULL;
476                 obd->obd_minor = -1;
477         }
478         write_unlock(&obd_dev_lock);
479 }
480
481 /**
482  * Register obd device.
483  *
484  * Find free slot in obd_devs[], fills it with \a new_obd.
485  *
486  * \param[in] new_obd obd_device to be registered
487  *
488  * \retval 0          success
489  * \retval -EEXIST    device with this name is registered
490  * \retval -EOVERFLOW obd_devs[] is full
491  */
492 int class_register_device(struct obd_device *new_obd)
493 {
494         int ret = 0;
495         int i;
496         int new_obd_minor = 0;
497         bool minor_assign = false;
498         bool retried = false;
499
500 again:
501         write_lock(&obd_dev_lock);
502         for (i = 0; i < class_devno_max(); i++) {
503                 struct obd_device *obd = class_num2obd(i);
504
505                 if (obd != NULL &&
506                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
507
508                         if (!retried) {
509                                 write_unlock(&obd_dev_lock);
510
511                                 /* the obd_device could be waited to be
512                                  * destroyed by the "obd_zombie_impexp_thread".
513                                  */
514                                 obd_zombie_barrier();
515                                 retried = true;
516                                 goto again;
517                         }
518
519                         CERROR("%s: already exists, won't add\n",
520                                obd->obd_name);
521                         /* in case we found a free slot before duplicate */
522                         minor_assign = false;
523                         ret = -EEXIST;
524                         break;
525                 }
526                 if (!minor_assign && obd == NULL) {
527                         new_obd_minor = i;
528                         minor_assign = true;
529                 }
530         }
531
532         if (minor_assign) {
533                 new_obd->obd_minor = new_obd_minor;
534                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
535                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
536                 obd_devs[new_obd_minor] = new_obd;
537         } else {
538                 if (ret == 0) {
539                         ret = -EOVERFLOW;
540                         CERROR("%s: all %u/%u devices used, increase "
541                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
542                                i, class_devno_max(), ret);
543                 }
544         }
545         write_unlock(&obd_dev_lock);
546
547         RETURN(ret);
548 }
549
550 static int class_name2dev_nolock(const char *name)
551 {
552         int i;
553
554         if (!name)
555                 return -1;
556
557         for (i = 0; i < class_devno_max(); i++) {
558                 struct obd_device *obd = class_num2obd(i);
559
560                 if (obd && strcmp(name, obd->obd_name) == 0) {
561                         /* Make sure we finished attaching before we give
562                            out any references */
563                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
564                         if (obd->obd_attached) {
565                                 return i;
566                         }
567                         break;
568                 }
569         }
570
571         return -1;
572 }
573
574 int class_name2dev(const char *name)
575 {
576         int i;
577
578         if (!name)
579                 return -1;
580
581         read_lock(&obd_dev_lock);
582         i = class_name2dev_nolock(name);
583         read_unlock(&obd_dev_lock);
584
585         return i;
586 }
587 EXPORT_SYMBOL(class_name2dev);
588
589 struct obd_device *class_name2obd(const char *name)
590 {
591         int dev = class_name2dev(name);
592
593         if (dev < 0 || dev > class_devno_max())
594                 return NULL;
595         return class_num2obd(dev);
596 }
597 EXPORT_SYMBOL(class_name2obd);
598
599 int class_uuid2dev_nolock(struct obd_uuid *uuid)
600 {
601         int i;
602
603         for (i = 0; i < class_devno_max(); i++) {
604                 struct obd_device *obd = class_num2obd(i);
605
606                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
607                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
608                         return i;
609                 }
610         }
611
612         return -1;
613 }
614
615 int class_uuid2dev(struct obd_uuid *uuid)
616 {
617         int i;
618
619         read_lock(&obd_dev_lock);
620         i = class_uuid2dev_nolock(uuid);
621         read_unlock(&obd_dev_lock);
622
623         return i;
624 }
625 EXPORT_SYMBOL(class_uuid2dev);
626
627 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
628 {
629         int dev = class_uuid2dev(uuid);
630         if (dev < 0)
631                 return NULL;
632         return class_num2obd(dev);
633 }
634 EXPORT_SYMBOL(class_uuid2obd);
635
636 /**
637  * Get obd device from ::obd_devs[]
638  *
639  * \param num [in] array index
640  *
641  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
642  *         otherwise return the obd device there.
643  */
644 struct obd_device *class_num2obd(int num)
645 {
646         struct obd_device *obd = NULL;
647
648         if (num < class_devno_max()) {
649                 obd = obd_devs[num];
650                 if (obd == NULL)
651                         return NULL;
652
653                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
654                          "%p obd_magic %08x != %08x\n",
655                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
656                 LASSERTF(obd->obd_minor == num,
657                          "%p obd_minor %0d != %0d\n",
658                          obd, obd->obd_minor, num);
659         }
660
661         return obd;
662 }
663 EXPORT_SYMBOL(class_num2obd);
664
665 /**
666  * Find obd in obd_dev[] by name or uuid.
667  *
668  * Increment obd's refcount if found.
669  *
670  * \param[in] str obd name or uuid
671  *
672  * \retval NULL    if not found
673  * \retval target  pointer to found obd_device
674  */
675 struct obd_device *class_dev_by_str(const char *str)
676 {
677         struct obd_device *target = NULL;
678         struct obd_uuid tgtuuid;
679         int rc;
680
681         obd_str2uuid(&tgtuuid, str);
682
683         read_lock(&obd_dev_lock);
684         rc = class_uuid2dev_nolock(&tgtuuid);
685         if (rc < 0)
686                 rc = class_name2dev_nolock(str);
687
688         if (rc >= 0)
689                 target = class_num2obd(rc);
690
691         if (target != NULL)
692                 class_incref(target, "find", current);
693         read_unlock(&obd_dev_lock);
694
695         RETURN(target);
696 }
697 EXPORT_SYMBOL(class_dev_by_str);
698
699 /**
700  * Get obd devices count. Device in any
701  *    state are counted
702  * \retval obd device count
703  */
704 int get_devices_count(void)
705 {
706         int index, max_index = class_devno_max(), dev_count = 0;
707
708         read_lock(&obd_dev_lock);
709         for (index = 0; index <= max_index; index++) {
710                 struct obd_device *obd = class_num2obd(index);
711                 if (obd != NULL)
712                         dev_count++;
713         }
714         read_unlock(&obd_dev_lock);
715
716         return dev_count;
717 }
718 EXPORT_SYMBOL(get_devices_count);
719
720 void class_obd_list(void)
721 {
722         char *status;
723         int i;
724
725         read_lock(&obd_dev_lock);
726         for (i = 0; i < class_devno_max(); i++) {
727                 struct obd_device *obd = class_num2obd(i);
728
729                 if (obd == NULL)
730                         continue;
731                 if (obd->obd_stopping)
732                         status = "ST";
733                 else if (obd->obd_set_up)
734                         status = "UP";
735                 else if (obd->obd_attached)
736                         status = "AT";
737                 else
738                         status = "--";
739                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
740                          i, status, obd->obd_type->typ_name,
741                          obd->obd_name, obd->obd_uuid.uuid,
742                          atomic_read(&obd->obd_refcount));
743         }
744         read_unlock(&obd_dev_lock);
745 }
746
747 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
748  * specified, then only the client with that uuid is returned,
749  * otherwise any client connected to the tgt is returned.
750  */
751 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
752                                          const char *type_name,
753                                          struct obd_uuid *grp_uuid)
754 {
755         int i;
756
757         read_lock(&obd_dev_lock);
758         for (i = 0; i < class_devno_max(); i++) {
759                 struct obd_device *obd = class_num2obd(i);
760
761                 if (obd == NULL)
762                         continue;
763                 if ((strncmp(obd->obd_type->typ_name, type_name,
764                              strlen(type_name)) == 0)) {
765                         if (obd_uuid_equals(tgt_uuid,
766                                             &obd->u.cli.cl_target_uuid) &&
767                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
768                                                          &obd->obd_uuid) : 1)) {
769                                 read_unlock(&obd_dev_lock);
770                                 return obd;
771                         }
772                 }
773         }
774         read_unlock(&obd_dev_lock);
775
776         return NULL;
777 }
778 EXPORT_SYMBOL(class_find_client_obd);
779
780 /* Iterate the obd_device list looking devices have grp_uuid. Start
781  * searching at *next, and if a device is found, the next index to look
782  * at is saved in *next. If next is NULL, then the first matching device
783  * will always be returned.
784  */
785 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
786 {
787         int i;
788
789         if (next == NULL)
790                 i = 0;
791         else if (*next >= 0 && *next < class_devno_max())
792                 i = *next;
793         else
794                 return NULL;
795
796         read_lock(&obd_dev_lock);
797         for (; i < class_devno_max(); i++) {
798                 struct obd_device *obd = class_num2obd(i);
799
800                 if (obd == NULL)
801                         continue;
802                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
803                         if (next != NULL)
804                                 *next = i+1;
805                         read_unlock(&obd_dev_lock);
806                         return obd;
807                 }
808         }
809         read_unlock(&obd_dev_lock);
810
811         return NULL;
812 }
813 EXPORT_SYMBOL(class_devices_in_group);
814
815 /**
816  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
817  * adjust sptlrpc settings accordingly.
818  */
819 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
820 {
821         struct obd_device  *obd;
822         const char         *type;
823         int                 i, rc = 0, rc2;
824
825         LASSERT(namelen > 0);
826
827         read_lock(&obd_dev_lock);
828         for (i = 0; i < class_devno_max(); i++) {
829                 obd = class_num2obd(i);
830
831                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
832                         continue;
833
834                 /* only notify mdc, osc, osp, lwp, mdt, ost
835                  * because only these have a -sptlrpc llog */
836                 type = obd->obd_type->typ_name;
837                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
838                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
839                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
840                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
841                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
842                     strcmp(type, LUSTRE_OST_NAME) != 0)
843                         continue;
844
845                 if (strncmp(obd->obd_name, fsname, namelen))
846                         continue;
847
848                 class_incref(obd, __FUNCTION__, obd);
849                 read_unlock(&obd_dev_lock);
850                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
851                                          sizeof(KEY_SPTLRPC_CONF),
852                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
853                 rc = rc ? rc : rc2;
854                 class_decref(obd, __FUNCTION__, obd);
855                 read_lock(&obd_dev_lock);
856         }
857         read_unlock(&obd_dev_lock);
858         return rc;
859 }
860 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
861
862 void obd_cleanup_caches(void)
863 {
864         ENTRY;
865         if (obd_device_cachep) {
866                 kmem_cache_destroy(obd_device_cachep);
867                 obd_device_cachep = NULL;
868         }
869
870         EXIT;
871 }
872
873 int obd_init_caches(void)
874 {
875         int rc;
876         ENTRY;
877
878         LASSERT(obd_device_cachep == NULL);
879         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
880                                 sizeof(struct obd_device),
881                                 0, 0, 0, sizeof(struct obd_device), NULL);
882         if (!obd_device_cachep)
883                 GOTO(out, rc = -ENOMEM);
884
885         RETURN(0);
886 out:
887         obd_cleanup_caches();
888         RETURN(rc);
889 }
890
891 static const char export_handle_owner[] = "export";
892
893 /* map connection to client */
894 struct obd_export *class_conn2export(struct lustre_handle *conn)
895 {
896         struct obd_export *export;
897         ENTRY;
898
899         if (!conn) {
900                 CDEBUG(D_CACHE, "looking for null handle\n");
901                 RETURN(NULL);
902         }
903
904         if (conn->cookie == -1) {  /* this means assign a new connection */
905                 CDEBUG(D_CACHE, "want a new connection\n");
906                 RETURN(NULL);
907         }
908
909         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
910         export = class_handle2object(conn->cookie, export_handle_owner);
911         RETURN(export);
912 }
913 EXPORT_SYMBOL(class_conn2export);
914
915 struct obd_device *class_exp2obd(struct obd_export *exp)
916 {
917         if (exp)
918                 return exp->exp_obd;
919         return NULL;
920 }
921 EXPORT_SYMBOL(class_exp2obd);
922
923 struct obd_import *class_exp2cliimp(struct obd_export *exp)
924 {
925         struct obd_device *obd = exp->exp_obd;
926         if (obd == NULL)
927                 return NULL;
928         return obd->u.cli.cl_import;
929 }
930 EXPORT_SYMBOL(class_exp2cliimp);
931
932 /* Export management functions */
933 static void class_export_destroy(struct obd_export *exp)
934 {
935         struct obd_device *obd = exp->exp_obd;
936         ENTRY;
937
938         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
939         LASSERT(obd != NULL);
940
941         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
942                exp->exp_client_uuid.uuid, obd->obd_name);
943
944         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
945         ptlrpc_connection_put(exp->exp_connection);
946
947         LASSERT(list_empty(&exp->exp_outstanding_replies));
948         LASSERT(list_empty(&exp->exp_uncommitted_replies));
949         LASSERT(list_empty(&exp->exp_req_replay_queue));
950         LASSERT(list_empty(&exp->exp_hp_rpcs));
951         obd_destroy_export(exp);
952         /* self export doesn't hold a reference to an obd, although it
953          * exists until freeing of the obd */
954         if (exp != obd->obd_self_export)
955                 class_decref(obd, "export", exp);
956
957         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
958         kfree_rcu(exp, exp_handle.h_rcu);
959         EXIT;
960 }
961
962 struct obd_export *class_export_get(struct obd_export *exp)
963 {
964         refcount_inc(&exp->exp_handle.h_ref);
965         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
966                refcount_read(&exp->exp_handle.h_ref));
967         return exp;
968 }
969 EXPORT_SYMBOL(class_export_get);
970
971 void class_export_put(struct obd_export *exp)
972 {
973         LASSERT(exp != NULL);
974         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
975         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
976         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
977                refcount_read(&exp->exp_handle.h_ref) - 1);
978
979         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
980                 struct obd_device *obd = exp->exp_obd;
981
982                 CDEBUG(D_IOCTL, "final put %p/%s\n",
983                        exp, exp->exp_client_uuid.uuid);
984
985                 /* release nid stat refererence */
986                 lprocfs_exp_cleanup(exp);
987
988                 if (exp == obd->obd_self_export) {
989                         /* self export should be destroyed without
990                          * zombie thread as it doesn't hold a
991                          * reference to obd and doesn't hold any
992                          * resources */
993                         class_export_destroy(exp);
994                         /* self export is destroyed, no class
995                          * references exist and it is safe to free
996                          * obd */
997                         class_free_dev(obd);
998                 } else {
999                         LASSERT(!list_empty(&exp->exp_obd_chain));
1000                         obd_zombie_export_add(exp);
1001                 }
1002
1003         }
1004 }
1005 EXPORT_SYMBOL(class_export_put);
1006
1007 static void obd_zombie_exp_cull(struct work_struct *ws)
1008 {
1009         struct obd_export *export;
1010
1011         export = container_of(ws, struct obd_export, exp_zombie_work);
1012         class_export_destroy(export);
1013 }
1014
1015 /* Creates a new export, adds it to the hash table, and returns a
1016  * pointer to it. The refcount is 2: one for the hash reference, and
1017  * one for the pointer returned by this function. */
1018 struct obd_export *__class_new_export(struct obd_device *obd,
1019                                       struct obd_uuid *cluuid, bool is_self)
1020 {
1021         struct obd_export *export;
1022         int rc = 0;
1023         ENTRY;
1024
1025         OBD_ALLOC_PTR(export);
1026         if (!export)
1027                 return ERR_PTR(-ENOMEM);
1028
1029         export->exp_conn_cnt = 0;
1030         export->exp_lock_hash = NULL;
1031         export->exp_flock_hash = NULL;
1032         /* 2 = class_handle_hash + last */
1033         refcount_set(&export->exp_handle.h_ref, 2);
1034         atomic_set(&export->exp_rpc_count, 0);
1035         atomic_set(&export->exp_cb_count, 0);
1036         atomic_set(&export->exp_locks_count, 0);
1037 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1038         INIT_LIST_HEAD(&export->exp_locks_list);
1039         spin_lock_init(&export->exp_locks_list_guard);
1040 #endif
1041         atomic_set(&export->exp_replay_count, 0);
1042         export->exp_obd = obd;
1043         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1044         spin_lock_init(&export->exp_uncommitted_replies_lock);
1045         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1046         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1047         INIT_HLIST_NODE(&export->exp_handle.h_link);
1048         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1049         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1050         class_handle_hash(&export->exp_handle, export_handle_owner);
1051         export->exp_last_request_time = ktime_get_real_seconds();
1052         spin_lock_init(&export->exp_lock);
1053         spin_lock_init(&export->exp_rpc_lock);
1054         INIT_HLIST_NODE(&export->exp_gen_hash);
1055         spin_lock_init(&export->exp_bl_list_lock);
1056         INIT_LIST_HEAD(&export->exp_bl_list);
1057         INIT_LIST_HEAD(&export->exp_stale_list);
1058         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1059
1060         export->exp_sp_peer = LUSTRE_SP_ANY;
1061         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1062         export->exp_client_uuid = *cluuid;
1063         obd_init_export(export);
1064
1065         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1066
1067         spin_lock(&obd->obd_dev_lock);
1068         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1069                 /* shouldn't happen, but might race */
1070                 if (obd->obd_stopping)
1071                         GOTO(exit_unlock, rc = -ENODEV);
1072
1073                 rc = obd_uuid_add(obd, export);
1074                 if (rc != 0) {
1075                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1076                                       obd->obd_name, cluuid->uuid, rc);
1077                         GOTO(exit_unlock, rc = -EALREADY);
1078                 }
1079         }
1080
1081         if (!is_self) {
1082                 class_incref(obd, "export", export);
1083                 list_add_tail(&export->exp_obd_chain_timed,
1084                               &obd->obd_exports_timed);
1085                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1086                 obd->obd_num_exports++;
1087         } else {
1088                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1089                 INIT_LIST_HEAD(&export->exp_obd_chain);
1090         }
1091         spin_unlock(&obd->obd_dev_lock);
1092         RETURN(export);
1093
1094 exit_unlock:
1095         spin_unlock(&obd->obd_dev_lock);
1096         class_handle_unhash(&export->exp_handle);
1097         obd_destroy_export(export);
1098         OBD_FREE_PTR(export);
1099         return ERR_PTR(rc);
1100 }
1101
1102 struct obd_export *class_new_export(struct obd_device *obd,
1103                                     struct obd_uuid *uuid)
1104 {
1105         return __class_new_export(obd, uuid, false);
1106 }
1107 EXPORT_SYMBOL(class_new_export);
1108
1109 struct obd_export *class_new_export_self(struct obd_device *obd,
1110                                          struct obd_uuid *uuid)
1111 {
1112         return __class_new_export(obd, uuid, true);
1113 }
1114
1115 void class_unlink_export(struct obd_export *exp)
1116 {
1117         class_handle_unhash(&exp->exp_handle);
1118
1119         if (exp->exp_obd->obd_self_export == exp) {
1120                 class_export_put(exp);
1121                 return;
1122         }
1123
1124         spin_lock(&exp->exp_obd->obd_dev_lock);
1125         /* delete an uuid-export hashitem from hashtables */
1126         if (exp != exp->exp_obd->obd_self_export)
1127                 obd_uuid_del(exp->exp_obd, exp);
1128
1129 #ifdef HAVE_SERVER_SUPPORT
1130         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1131                 struct tg_export_data   *ted = &exp->exp_target_data;
1132                 struct cfs_hash         *hash;
1133
1134                 /* Because obd_gen_hash will not be released until
1135                  * class_cleanup(), so hash should never be NULL here */
1136                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1137                 LASSERT(hash != NULL);
1138                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1139                              &exp->exp_gen_hash);
1140                 cfs_hash_putref(hash);
1141         }
1142 #endif /* HAVE_SERVER_SUPPORT */
1143
1144         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1145         list_del_init(&exp->exp_obd_chain_timed);
1146         exp->exp_obd->obd_num_exports--;
1147         spin_unlock(&exp->exp_obd->obd_dev_lock);
1148         atomic_inc(&obd_stale_export_num);
1149
1150         /* A reference is kept by obd_stale_exports list */
1151         obd_stale_export_put(exp);
1152 }
1153 EXPORT_SYMBOL(class_unlink_export);
1154
1155 /* Import management functions */
1156 static void obd_zombie_import_free(struct obd_import *imp)
1157 {
1158         struct obd_import_conn *imp_conn;
1159
1160         ENTRY;
1161         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1162                imp->imp_obd->obd_name);
1163
1164         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1165
1166         ptlrpc_connection_put(imp->imp_connection);
1167
1168         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1169                                                     struct obd_import_conn,
1170                                                     oic_item)) != NULL) {
1171                 list_del_init(&imp_conn->oic_item);
1172                 ptlrpc_connection_put(imp_conn->oic_conn);
1173                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1174         }
1175
1176         LASSERT(imp->imp_sec == NULL);
1177         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1178                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1179         class_decref(imp->imp_obd, "import", imp);
1180         OBD_FREE_PTR(imp);
1181         EXIT;
1182 }
1183
1184 struct obd_import *class_import_get(struct obd_import *import)
1185 {
1186         refcount_inc(&import->imp_refcount);
1187         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1188                refcount_read(&import->imp_refcount),
1189                import->imp_obd->obd_name);
1190         return import;
1191 }
1192 EXPORT_SYMBOL(class_import_get);
1193
1194 void class_import_put(struct obd_import *imp)
1195 {
1196         ENTRY;
1197
1198         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1199
1200         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1201                refcount_read(&imp->imp_refcount) - 1,
1202                imp->imp_obd->obd_name);
1203
1204         if (refcount_dec_and_test(&imp->imp_refcount)) {
1205                 CDEBUG(D_INFO, "final put import %p\n", imp);
1206                 obd_zombie_import_add(imp);
1207         }
1208
1209         EXIT;
1210 }
1211 EXPORT_SYMBOL(class_import_put);
1212
1213 static void init_imp_at(struct imp_at *at) {
1214         int i;
1215         at_init(&at->iat_net_latency, 0, 0);
1216         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1217                 /* max service estimates are tracked on the server side, so
1218                    don't use the AT history here, just use the last reported
1219                    val. (But keep hist for proc histogram, worst_ever) */
1220                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1221                         AT_FLG_NOHIST);
1222         }
1223 }
1224
1225 static void obd_zombie_imp_cull(struct work_struct *ws)
1226 {
1227         struct obd_import *import;
1228
1229         import = container_of(ws, struct obd_import, imp_zombie_work);
1230         obd_zombie_import_free(import);
1231 }
1232
1233 struct obd_import *class_new_import(struct obd_device *obd)
1234 {
1235         struct obd_import *imp;
1236         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1237
1238         OBD_ALLOC(imp, sizeof(*imp));
1239         if (imp == NULL)
1240                 return NULL;
1241
1242         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1243         INIT_LIST_HEAD(&imp->imp_replay_list);
1244         INIT_LIST_HEAD(&imp->imp_sending_list);
1245         INIT_LIST_HEAD(&imp->imp_delayed_list);
1246         INIT_LIST_HEAD(&imp->imp_committed_list);
1247         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1248         imp->imp_known_replied_xid = 0;
1249         imp->imp_replay_cursor = &imp->imp_committed_list;
1250         spin_lock_init(&imp->imp_lock);
1251         imp->imp_last_success_conn = 0;
1252         imp->imp_state = LUSTRE_IMP_NEW;
1253         imp->imp_obd = class_incref(obd, "import", imp);
1254         rwlock_init(&imp->imp_sec_lock);
1255         init_waitqueue_head(&imp->imp_recovery_waitq);
1256         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1257
1258         if (curr_pid_ns && curr_pid_ns->child_reaper)
1259                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1260         else
1261                 imp->imp_sec_refpid = 1;
1262
1263         refcount_set(&imp->imp_refcount, 2);
1264         atomic_set(&imp->imp_unregistering, 0);
1265         atomic_set(&imp->imp_reqs, 0);
1266         atomic_set(&imp->imp_inflight, 0);
1267         atomic_set(&imp->imp_replay_inflight, 0);
1268         init_waitqueue_head(&imp->imp_replay_waitq);
1269         atomic_set(&imp->imp_inval_count, 0);
1270         INIT_LIST_HEAD(&imp->imp_conn_list);
1271         init_imp_at(&imp->imp_at);
1272
1273         /* the default magic is V2, will be used in connect RPC, and
1274          * then adjusted according to the flags in request/reply. */
1275         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1276
1277         return imp;
1278 }
1279 EXPORT_SYMBOL(class_new_import);
1280
1281 void class_destroy_import(struct obd_import *import)
1282 {
1283         LASSERT(import != NULL);
1284         LASSERT(import != LP_POISON);
1285
1286         spin_lock(&import->imp_lock);
1287         import->imp_generation++;
1288         spin_unlock(&import->imp_lock);
1289         class_import_put(import);
1290 }
1291 EXPORT_SYMBOL(class_destroy_import);
1292
1293 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1294
1295 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1296 {
1297         spin_lock(&exp->exp_locks_list_guard);
1298
1299         LASSERT(lock->l_exp_refs_nr >= 0);
1300
1301         if (lock->l_exp_refs_target != NULL &&
1302             lock->l_exp_refs_target != exp) {
1303                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1304                               exp, lock, lock->l_exp_refs_target);
1305         }
1306         if ((lock->l_exp_refs_nr ++) == 0) {
1307                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1308                 lock->l_exp_refs_target = exp;
1309         }
1310         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1311                lock, exp, lock->l_exp_refs_nr);
1312         spin_unlock(&exp->exp_locks_list_guard);
1313 }
1314 EXPORT_SYMBOL(__class_export_add_lock_ref);
1315
1316 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1317 {
1318         spin_lock(&exp->exp_locks_list_guard);
1319         LASSERT(lock->l_exp_refs_nr > 0);
1320         if (lock->l_exp_refs_target != exp) {
1321                 LCONSOLE_WARN("lock %p, "
1322                               "mismatching export pointers: %p, %p\n",
1323                               lock, lock->l_exp_refs_target, exp);
1324         }
1325         if (-- lock->l_exp_refs_nr == 0) {
1326                 list_del_init(&lock->l_exp_refs_link);
1327                 lock->l_exp_refs_target = NULL;
1328         }
1329         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1330                lock, exp, lock->l_exp_refs_nr);
1331         spin_unlock(&exp->exp_locks_list_guard);
1332 }
1333 EXPORT_SYMBOL(__class_export_del_lock_ref);
1334 #endif
1335
1336 /* A connection defines an export context in which preallocation can
1337    be managed. This releases the export pointer reference, and returns
1338    the export handle, so the export refcount is 1 when this function
1339    returns. */
1340 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1341                   struct obd_uuid *cluuid)
1342 {
1343         struct obd_export *export;
1344         LASSERT(conn != NULL);
1345         LASSERT(obd != NULL);
1346         LASSERT(cluuid != NULL);
1347         ENTRY;
1348
1349         export = class_new_export(obd, cluuid);
1350         if (IS_ERR(export))
1351                 RETURN(PTR_ERR(export));
1352
1353         conn->cookie = export->exp_handle.h_cookie;
1354         class_export_put(export);
1355
1356         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1357                cluuid->uuid, conn->cookie);
1358         RETURN(0);
1359 }
1360 EXPORT_SYMBOL(class_connect);
1361
1362 /* if export is involved in recovery then clean up related things */
1363 static void class_export_recovery_cleanup(struct obd_export *exp)
1364 {
1365         struct obd_device *obd = exp->exp_obd;
1366
1367         spin_lock(&obd->obd_recovery_task_lock);
1368         if (obd->obd_recovering) {
1369                 if (exp->exp_in_recovery) {
1370                         spin_lock(&exp->exp_lock);
1371                         exp->exp_in_recovery = 0;
1372                         spin_unlock(&exp->exp_lock);
1373                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1374                         atomic_dec(&obd->obd_connected_clients);
1375                 }
1376
1377                 /* if called during recovery then should update
1378                  * obd_stale_clients counter,
1379                  * lightweight exports are not counted */
1380                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1381                         exp->exp_obd->obd_stale_clients++;
1382         }
1383         spin_unlock(&obd->obd_recovery_task_lock);
1384
1385         spin_lock(&exp->exp_lock);
1386         /** Cleanup req replay fields */
1387         if (exp->exp_req_replay_needed) {
1388                 exp->exp_req_replay_needed = 0;
1389
1390                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1391                 atomic_dec(&obd->obd_req_replay_clients);
1392         }
1393
1394         /** Cleanup lock replay data */
1395         if (exp->exp_lock_replay_needed) {
1396                 exp->exp_lock_replay_needed = 0;
1397
1398                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1399                 atomic_dec(&obd->obd_lock_replay_clients);
1400         }
1401         spin_unlock(&exp->exp_lock);
1402 }
1403
1404 /* This function removes 1-3 references from the export:
1405  * 1 - for export pointer passed
1406  * and if disconnect really need
1407  * 2 - removing from hash
1408  * 3 - in client_unlink_export
1409  * The export pointer passed to this function can destroyed */
1410 int class_disconnect(struct obd_export *export)
1411 {
1412         int already_disconnected;
1413         ENTRY;
1414
1415         if (export == NULL) {
1416                 CWARN("attempting to free NULL export %p\n", export);
1417                 RETURN(-EINVAL);
1418         }
1419
1420         spin_lock(&export->exp_lock);
1421         already_disconnected = export->exp_disconnected;
1422         export->exp_disconnected = 1;
1423 #ifdef HAVE_SERVER_SUPPORT
1424         /*  We hold references of export for uuid hash
1425          *  and nid_hash and export link at least. So
1426          *  it is safe to call rh*table_remove_fast in
1427          *  there.
1428          */
1429         obd_nid_del(export->exp_obd, export);
1430 #endif /* HAVE_SERVER_SUPPORT */
1431         spin_unlock(&export->exp_lock);
1432
1433         /* class_cleanup(), abort_recovery(), and class_fail_export()
1434          * all end up in here, and if any of them race we shouldn't
1435          * call extra class_export_puts(). */
1436         if (already_disconnected)
1437                 GOTO(no_disconn, already_disconnected);
1438
1439         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1440                export->exp_handle.h_cookie);
1441
1442         class_export_recovery_cleanup(export);
1443         class_unlink_export(export);
1444 no_disconn:
1445         class_export_put(export);
1446         RETURN(0);
1447 }
1448 EXPORT_SYMBOL(class_disconnect);
1449
1450 /* Return non-zero for a fully connected export */
1451 int class_connected_export(struct obd_export *exp)
1452 {
1453         int connected = 0;
1454
1455         if (exp) {
1456                 spin_lock(&exp->exp_lock);
1457                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1458                 spin_unlock(&exp->exp_lock);
1459         }
1460         return connected;
1461 }
1462 EXPORT_SYMBOL(class_connected_export);
1463
1464 static void class_disconnect_export_list(struct list_head *list,
1465                                          enum obd_option flags)
1466 {
1467         int rc;
1468         struct obd_export *exp;
1469         ENTRY;
1470
1471         /* It's possible that an export may disconnect itself, but
1472          * nothing else will be added to this list.
1473          */
1474         while ((exp = list_first_entry_or_null(list, struct obd_export,
1475                                                exp_obd_chain)) != NULL) {
1476                 /* need for safe call CDEBUG after obd_disconnect */
1477                 class_export_get(exp);
1478
1479                 spin_lock(&exp->exp_lock);
1480                 exp->exp_flags = flags;
1481                 spin_unlock(&exp->exp_lock);
1482
1483                 if (obd_uuid_equals(&exp->exp_client_uuid,
1484                                     &exp->exp_obd->obd_uuid)) {
1485                         CDEBUG(D_HA,
1486                                "exp %p export uuid == obd uuid, don't discon\n",
1487                                exp);
1488                         /* Need to delete this now so we don't end up pointing
1489                          * to work_list later when this export is cleaned up. */
1490                         list_del_init(&exp->exp_obd_chain);
1491                         class_export_put(exp);
1492                         continue;
1493                 }
1494
1495                 class_export_get(exp);
1496                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1497                        "last request at %lld\n",
1498                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1499                        exp, exp->exp_last_request_time);
1500                 /* release one export reference anyway */
1501                 rc = obd_disconnect(exp);
1502
1503                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1504                        obd_export_nid2str(exp), exp, rc);
1505                 class_export_put(exp);
1506         }
1507         EXIT;
1508 }
1509
1510 void class_disconnect_exports(struct obd_device *obd)
1511 {
1512         LIST_HEAD(work_list);
1513         ENTRY;
1514
1515         /* Move all of the exports from obd_exports to a work list, en masse. */
1516         spin_lock(&obd->obd_dev_lock);
1517         list_splice_init(&obd->obd_exports, &work_list);
1518         list_splice_init(&obd->obd_delayed_exports, &work_list);
1519         spin_unlock(&obd->obd_dev_lock);
1520
1521         if (!list_empty(&work_list)) {
1522                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1523                        "disconnecting them\n", obd->obd_minor, obd);
1524                 class_disconnect_export_list(&work_list,
1525                                              exp_flags_from_obd(obd));
1526         } else
1527                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1528                        obd->obd_minor, obd);
1529         EXIT;
1530 }
1531 EXPORT_SYMBOL(class_disconnect_exports);
1532
1533 /* Remove exports that have not completed recovery.
1534  */
1535 void class_disconnect_stale_exports(struct obd_device *obd,
1536                                     int (*test_export)(struct obd_export *))
1537 {
1538         LIST_HEAD(work_list);
1539         struct obd_export *exp, *n;
1540         int evicted = 0;
1541         ENTRY;
1542
1543         spin_lock(&obd->obd_dev_lock);
1544         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1545                                  exp_obd_chain) {
1546                 /* don't count self-export as client */
1547                 if (obd_uuid_equals(&exp->exp_client_uuid,
1548                                     &exp->exp_obd->obd_uuid))
1549                         continue;
1550
1551                 /* don't evict clients which have no slot in last_rcvd
1552                  * (e.g. lightweight connection) */
1553                 if (exp->exp_target_data.ted_lr_idx == -1)
1554                         continue;
1555
1556                 spin_lock(&exp->exp_lock);
1557                 if (exp->exp_failed || test_export(exp)) {
1558                         spin_unlock(&exp->exp_lock);
1559                         continue;
1560                 }
1561                 exp->exp_failed = 1;
1562                 spin_unlock(&exp->exp_lock);
1563
1564                 list_move(&exp->exp_obd_chain, &work_list);
1565                 evicted++;
1566                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1567                        obd->obd_name, exp->exp_client_uuid.uuid,
1568                        obd_export_nid2str(exp));
1569                 print_export_data(exp, "EVICTING", 0, D_HA);
1570         }
1571         spin_unlock(&obd->obd_dev_lock);
1572
1573         if (evicted)
1574                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1575                               obd->obd_name, evicted);
1576
1577         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1578                                                  OBD_OPT_ABORT_RECOV);
1579         EXIT;
1580 }
1581 EXPORT_SYMBOL(class_disconnect_stale_exports);
1582
1583 void class_fail_export(struct obd_export *exp)
1584 {
1585         int rc, already_failed;
1586
1587         spin_lock(&exp->exp_lock);
1588         already_failed = exp->exp_failed;
1589         exp->exp_failed = 1;
1590         spin_unlock(&exp->exp_lock);
1591
1592         if (already_failed) {
1593                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1594                        exp, exp->exp_client_uuid.uuid);
1595                 return;
1596         }
1597
1598         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1599                exp, exp->exp_client_uuid.uuid);
1600
1601         if (obd_dump_on_timeout)
1602                 libcfs_debug_dumplog();
1603
1604         /* need for safe call CDEBUG after obd_disconnect */
1605         class_export_get(exp);
1606
1607         /* Most callers into obd_disconnect are removing their own reference
1608          * (request, for example) in addition to the one from the hash table.
1609          * We don't have such a reference here, so make one. */
1610         class_export_get(exp);
1611         rc = obd_disconnect(exp);
1612         if (rc)
1613                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1614         else
1615                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1616                        exp, exp->exp_client_uuid.uuid);
1617         class_export_put(exp);
1618 }
1619 EXPORT_SYMBOL(class_fail_export);
1620
1621 #ifdef HAVE_SERVER_SUPPORT
1622
1623 static int take_first(struct obd_export *exp, void *data)
1624 {
1625         struct obd_export **expp = data;
1626
1627         if (*expp)
1628                 /* already have one */
1629                 return 0;
1630         if (exp->exp_failed)
1631                 /* Don't want this one */
1632                 return 0;
1633         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1634                 /* Cannot get a ref on this one */
1635                 return 0;
1636         *expp = exp;
1637         return 1;
1638 }
1639
1640 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1641 {
1642         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1643         struct obd_export *doomed_exp;
1644         int exports_evicted = 0;
1645
1646         spin_lock(&obd->obd_dev_lock);
1647         /* umount has run already, so evict thread should leave
1648          * its task to umount thread now */
1649         if (obd->obd_stopping) {
1650                 spin_unlock(&obd->obd_dev_lock);
1651                 return exports_evicted;
1652         }
1653         spin_unlock(&obd->obd_dev_lock);
1654
1655         doomed_exp = NULL;
1656         while (obd_nid_export_for_each(obd, nid_key,
1657                                        take_first, &doomed_exp) > 0) {
1658
1659                 LASSERTF(doomed_exp != obd->obd_self_export,
1660                          "self-export is hashed by NID?\n");
1661
1662                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1663                               obd->obd_name,
1664                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1665                               obd_export_nid2str(doomed_exp));
1666
1667                 class_fail_export(doomed_exp);
1668                 class_export_put(doomed_exp);
1669                 exports_evicted++;
1670                 doomed_exp = NULL;
1671         }
1672
1673         if (!exports_evicted)
1674                 CDEBUG(D_HA,
1675                        "%s: can't disconnect NID '%s': no exports found\n",
1676                        obd->obd_name, nid);
1677         return exports_evicted;
1678 }
1679 EXPORT_SYMBOL(obd_export_evict_by_nid);
1680
1681 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1682 {
1683         struct obd_export *doomed_exp = NULL;
1684         struct obd_uuid doomed_uuid;
1685         int exports_evicted = 0;
1686
1687         spin_lock(&obd->obd_dev_lock);
1688         if (obd->obd_stopping) {
1689                 spin_unlock(&obd->obd_dev_lock);
1690                 return exports_evicted;
1691         }
1692         spin_unlock(&obd->obd_dev_lock);
1693
1694         obd_str2uuid(&doomed_uuid, uuid);
1695         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1696                 CERROR("%s: can't evict myself\n", obd->obd_name);
1697                 return exports_evicted;
1698         }
1699
1700         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1701         if (doomed_exp == NULL) {
1702                 CERROR("%s: can't disconnect %s: no exports found\n",
1703                        obd->obd_name, uuid);
1704         } else {
1705                 CWARN("%s: evicting %s at adminstrative request\n",
1706                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1707                 class_fail_export(doomed_exp);
1708                 class_export_put(doomed_exp);
1709                 obd_uuid_del(obd, doomed_exp);
1710                 exports_evicted++;
1711         }
1712
1713         return exports_evicted;
1714 }
1715 #endif /* HAVE_SERVER_SUPPORT */
1716
1717 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1718 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1719 EXPORT_SYMBOL(class_export_dump_hook);
1720 #endif
1721
1722 static void print_export_data(struct obd_export *exp, const char *status,
1723                               int locks, int debug_level)
1724 {
1725         struct ptlrpc_reply_state *rs;
1726         struct ptlrpc_reply_state *first_reply = NULL;
1727         int nreplies = 0;
1728
1729         spin_lock(&exp->exp_lock);
1730         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1731                             rs_exp_list) {
1732                 if (nreplies == 0)
1733                         first_reply = rs;
1734                 nreplies++;
1735         }
1736         spin_unlock(&exp->exp_lock);
1737
1738         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1739                "%p %s %llu stale:%d\n",
1740                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1741                obd_export_nid2str(exp),
1742                refcount_read(&exp->exp_handle.h_ref),
1743                atomic_read(&exp->exp_rpc_count),
1744                atomic_read(&exp->exp_cb_count),
1745                atomic_read(&exp->exp_locks_count),
1746                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1747                nreplies, first_reply, nreplies > 3 ? "..." : "",
1748                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1749 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1750         if (locks && class_export_dump_hook != NULL)
1751                 class_export_dump_hook(exp);
1752 #endif
1753 }
1754
1755 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1756 {
1757         struct obd_export *exp;
1758
1759         spin_lock(&obd->obd_dev_lock);
1760         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1761                 print_export_data(exp, "ACTIVE", locks, debug_level);
1762         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1763                 print_export_data(exp, "UNLINKED", locks, debug_level);
1764         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1765                 print_export_data(exp, "DELAYED", locks, debug_level);
1766         spin_unlock(&obd->obd_dev_lock);
1767 }
1768
1769 void obd_exports_barrier(struct obd_device *obd)
1770 {
1771         int waited = 2;
1772         LASSERT(list_empty(&obd->obd_exports));
1773         spin_lock(&obd->obd_dev_lock);
1774         while (!list_empty(&obd->obd_unlinked_exports)) {
1775                 spin_unlock(&obd->obd_dev_lock);
1776                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1777                 if (waited > 5 && is_power_of_2(waited)) {
1778                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1779                                       "more than %d seconds. "
1780                                       "The obd refcount = %d. Is it stuck?\n",
1781                                       obd->obd_name, waited,
1782                                       atomic_read(&obd->obd_refcount));
1783                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1784                 }
1785                 waited *= 2;
1786                 spin_lock(&obd->obd_dev_lock);
1787         }
1788         spin_unlock(&obd->obd_dev_lock);
1789 }
1790 EXPORT_SYMBOL(obd_exports_barrier);
1791
1792 /**
1793  * Add export to the obd_zombe thread and notify it.
1794  */
1795 static void obd_zombie_export_add(struct obd_export *exp) {
1796         atomic_dec(&obd_stale_export_num);
1797         spin_lock(&exp->exp_obd->obd_dev_lock);
1798         LASSERT(!list_empty(&exp->exp_obd_chain));
1799         list_del_init(&exp->exp_obd_chain);
1800         spin_unlock(&exp->exp_obd->obd_dev_lock);
1801
1802         queue_work(zombie_wq, &exp->exp_zombie_work);
1803 }
1804
1805 /**
1806  * Add import to the obd_zombe thread and notify it.
1807  */
1808 static void obd_zombie_import_add(struct obd_import *imp) {
1809         LASSERT(imp->imp_sec == NULL);
1810
1811         queue_work(zombie_wq, &imp->imp_zombie_work);
1812 }
1813
1814 /**
1815  * wait when obd_zombie import/export queues become empty
1816  */
1817 void obd_zombie_barrier(void)
1818 {
1819         flush_workqueue(zombie_wq);
1820 }
1821 EXPORT_SYMBOL(obd_zombie_barrier);
1822
1823
1824 struct obd_export *obd_stale_export_get(void)
1825 {
1826         struct obd_export *exp = NULL;
1827         ENTRY;
1828
1829         spin_lock(&obd_stale_export_lock);
1830         if (!list_empty(&obd_stale_exports)) {
1831                 exp = list_first_entry(&obd_stale_exports,
1832                                        struct obd_export, exp_stale_list);
1833                 list_del_init(&exp->exp_stale_list);
1834         }
1835         spin_unlock(&obd_stale_export_lock);
1836
1837         if (exp) {
1838                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1839                        atomic_read(&obd_stale_export_num));
1840         }
1841         RETURN(exp);
1842 }
1843 EXPORT_SYMBOL(obd_stale_export_get);
1844
1845 void obd_stale_export_put(struct obd_export *exp)
1846 {
1847         ENTRY;
1848
1849         LASSERT(list_empty(&exp->exp_stale_list));
1850         if (exp->exp_lock_hash &&
1851             atomic_read(&exp->exp_lock_hash->hs_count)) {
1852                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1853                        atomic_read(&obd_stale_export_num));
1854
1855                 spin_lock_bh(&exp->exp_bl_list_lock);
1856                 spin_lock(&obd_stale_export_lock);
1857                 /* Add to the tail if there is no blocked locks,
1858                  * to the head otherwise. */
1859                 if (list_empty(&exp->exp_bl_list))
1860                         list_add_tail(&exp->exp_stale_list,
1861                                       &obd_stale_exports);
1862                 else
1863                         list_add(&exp->exp_stale_list,
1864                                  &obd_stale_exports);
1865
1866                 spin_unlock(&obd_stale_export_lock);
1867                 spin_unlock_bh(&exp->exp_bl_list_lock);
1868         } else {
1869                 class_export_put(exp);
1870         }
1871         EXIT;
1872 }
1873 EXPORT_SYMBOL(obd_stale_export_put);
1874
1875 /**
1876  * Adjust the position of the export in the stale list,
1877  * i.e. move to the head of the list if is needed.
1878  **/
1879 void obd_stale_export_adjust(struct obd_export *exp)
1880 {
1881         LASSERT(exp != NULL);
1882         spin_lock_bh(&exp->exp_bl_list_lock);
1883         spin_lock(&obd_stale_export_lock);
1884
1885         if (!list_empty(&exp->exp_stale_list) &&
1886             !list_empty(&exp->exp_bl_list))
1887                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1888
1889         spin_unlock(&obd_stale_export_lock);
1890         spin_unlock_bh(&exp->exp_bl_list_lock);
1891 }
1892 EXPORT_SYMBOL(obd_stale_export_adjust);
1893
1894 /**
1895  * start destroy zombie import/export thread
1896  */
1897 int obd_zombie_impexp_init(void)
1898 {
1899         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1900                                            0, CFS_CPT_ANY,
1901                                            cfs_cpt_number(cfs_cpt_tab));
1902
1903         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1904 }
1905
1906 /**
1907  * stop destroy zombie import/export thread
1908  */
1909 void obd_zombie_impexp_stop(void)
1910 {
1911         destroy_workqueue(zombie_wq);
1912         LASSERT(list_empty(&obd_stale_exports));
1913 }
1914
1915 /***** Kernel-userspace comm helpers *******/
1916
1917 /* Get length of entire message, including header */
1918 int kuc_len(int payload_len)
1919 {
1920         return sizeof(struct kuc_hdr) + payload_len;
1921 }
1922 EXPORT_SYMBOL(kuc_len);
1923
1924 /* Get a pointer to kuc header, given a ptr to the payload
1925  * @param p Pointer to payload area
1926  * @returns Pointer to kuc header
1927  */
1928 struct kuc_hdr * kuc_ptr(void *p)
1929 {
1930         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1931         LASSERT(lh->kuc_magic == KUC_MAGIC);
1932         return lh;
1933 }
1934 EXPORT_SYMBOL(kuc_ptr);
1935
1936 /* Alloc space for a message, and fill in header
1937  * @return Pointer to payload area
1938  */
1939 void *kuc_alloc(int payload_len, int transport, int type)
1940 {
1941         struct kuc_hdr *lh;
1942         int len = kuc_len(payload_len);
1943
1944         OBD_ALLOC(lh, len);
1945         if (lh == NULL)
1946                 return ERR_PTR(-ENOMEM);
1947
1948         lh->kuc_magic = KUC_MAGIC;
1949         lh->kuc_transport = transport;
1950         lh->kuc_msgtype = type;
1951         lh->kuc_msglen = len;
1952
1953         return (void *)(lh + 1);
1954 }
1955 EXPORT_SYMBOL(kuc_alloc);
1956
1957 /* Takes pointer to payload area */
1958 void kuc_free(void *p, int payload_len)
1959 {
1960         struct kuc_hdr *lh = kuc_ptr(p);
1961         OBD_FREE(lh, kuc_len(payload_len));
1962 }
1963 EXPORT_SYMBOL(kuc_free);
1964
1965 struct obd_request_slot_waiter {
1966         struct list_head        orsw_entry;
1967         wait_queue_head_t       orsw_waitq;
1968         bool                    orsw_signaled;
1969 };
1970
1971 static bool obd_request_slot_avail(struct client_obd *cli,
1972                                    struct obd_request_slot_waiter *orsw)
1973 {
1974         bool avail;
1975
1976         spin_lock(&cli->cl_loi_list_lock);
1977         avail = !!list_empty(&orsw->orsw_entry);
1978         spin_unlock(&cli->cl_loi_list_lock);
1979
1980         return avail;
1981 };
1982
1983 /*
1984  * For network flow control, the RPC sponsor needs to acquire a credit
1985  * before sending the RPC. The credits count for a connection is defined
1986  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1987  * the subsequent RPC sponsors need to wait until others released their
1988  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1989  */
1990 int obd_get_request_slot(struct client_obd *cli)
1991 {
1992         struct obd_request_slot_waiter   orsw;
1993         int                              rc;
1994
1995         spin_lock(&cli->cl_loi_list_lock);
1996         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1997                 cli->cl_rpcs_in_flight++;
1998                 spin_unlock(&cli->cl_loi_list_lock);
1999                 return 0;
2000         }
2001
2002         init_waitqueue_head(&orsw.orsw_waitq);
2003         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2004         orsw.orsw_signaled = false;
2005         spin_unlock(&cli->cl_loi_list_lock);
2006
2007         rc = l_wait_event_abortable(orsw.orsw_waitq,
2008                                     obd_request_slot_avail(cli, &orsw) ||
2009                                     orsw.orsw_signaled);
2010
2011         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2012          * freed but other (such as obd_put_request_slot) is using it. */
2013         spin_lock(&cli->cl_loi_list_lock);
2014         if (rc != 0) {
2015                 if (!orsw.orsw_signaled) {
2016                         if (list_empty(&orsw.orsw_entry))
2017                                 cli->cl_rpcs_in_flight--;
2018                         else
2019                                 list_del(&orsw.orsw_entry);
2020                 }
2021                 rc = -EINTR;
2022         }
2023
2024         if (orsw.orsw_signaled) {
2025                 LASSERT(list_empty(&orsw.orsw_entry));
2026
2027                 rc = -EINTR;
2028         }
2029         spin_unlock(&cli->cl_loi_list_lock);
2030
2031         return rc;
2032 }
2033 EXPORT_SYMBOL(obd_get_request_slot);
2034
2035 void obd_put_request_slot(struct client_obd *cli)
2036 {
2037         struct obd_request_slot_waiter *orsw;
2038
2039         spin_lock(&cli->cl_loi_list_lock);
2040         cli->cl_rpcs_in_flight--;
2041
2042         /* If there is free slot, wakeup the first waiter. */
2043         if (!list_empty(&cli->cl_flight_waiters) &&
2044             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2045                 orsw = list_first_entry(&cli->cl_flight_waiters,
2046                                         struct obd_request_slot_waiter,
2047                                         orsw_entry);
2048                 list_del_init(&orsw->orsw_entry);
2049                 cli->cl_rpcs_in_flight++;
2050                 wake_up(&orsw->orsw_waitq);
2051         }
2052         spin_unlock(&cli->cl_loi_list_lock);
2053 }
2054 EXPORT_SYMBOL(obd_put_request_slot);
2055
2056 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2057 {
2058         return cli->cl_max_rpcs_in_flight;
2059 }
2060 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2061
2062 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2063 {
2064         struct obd_request_slot_waiter *orsw;
2065         __u32                           old;
2066         int                             diff;
2067         int                             i;
2068         int                             rc;
2069
2070         if (max > OBD_MAX_RIF_MAX || max < 1)
2071                 return -ERANGE;
2072
2073         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2074                cli->cl_import->imp_obd->obd_name, max,
2075                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2076
2077         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2078                    LUSTRE_MDC_NAME) == 0) {
2079                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2080                  * strictly lower that max_rpcs_in_flight */
2081                 if (max < 2) {
2082                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2083                                cli->cl_import->imp_obd->obd_name);
2084                         return -ERANGE;
2085                 }
2086                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2087                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2088                         if (rc != 0)
2089                                 return rc;
2090                 }
2091         }
2092
2093         spin_lock(&cli->cl_loi_list_lock);
2094         old = cli->cl_max_rpcs_in_flight;
2095         cli->cl_max_rpcs_in_flight = max;
2096         client_adjust_max_dirty(cli);
2097
2098         diff = max - old;
2099
2100         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2101         for (i = 0; i < diff; i++) {
2102                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2103                                                 struct obd_request_slot_waiter,
2104                                                 orsw_entry);
2105                 if (!orsw)
2106                         break;
2107
2108                 list_del_init(&orsw->orsw_entry);
2109                 cli->cl_rpcs_in_flight++;
2110                 wake_up(&orsw->orsw_waitq);
2111         }
2112         spin_unlock(&cli->cl_loi_list_lock);
2113
2114         return 0;
2115 }
2116 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2117
2118 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2119 {
2120         return cli->cl_max_mod_rpcs_in_flight;
2121 }
2122 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2123
2124 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2125 {
2126         struct obd_connect_data *ocd;
2127         __u16 maxmodrpcs;
2128         __u16 prev;
2129
2130         if (max > OBD_MAX_RIF_MAX || max < 1)
2131                 return -ERANGE;
2132
2133         ocd = &cli->cl_import->imp_connect_data;
2134         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2135                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2136                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2137
2138         if (max == OBD_MAX_RIF_MAX)
2139                 max = OBD_MAX_RIF_MAX - 1;
2140
2141         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2142          * increase this value, also bump up max_rpcs_in_flight to match.
2143          */
2144         if (max >= cli->cl_max_rpcs_in_flight) {
2145                 CDEBUG(D_INFO,
2146                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2147                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2148                 obd_set_max_rpcs_in_flight(cli, max + 1);
2149         }
2150
2151         /* cannot exceed max modify RPCs in flight supported by the server,
2152          * but verify ocd_connect_flags is at least initialized first.  If
2153          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2154          */
2155         if (!ocd->ocd_connect_flags) {
2156                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2157         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2158                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2159                 if (maxmodrpcs == 0) { /* connection not finished yet */
2160                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2161                         CDEBUG(D_INFO,
2162                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2163                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2164                 }
2165         } else {
2166                 maxmodrpcs = 1;
2167         }
2168         if (max > maxmodrpcs) {
2169                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2170                        cli->cl_import->imp_obd->obd_name,
2171                        max, maxmodrpcs);
2172                 return -ERANGE;
2173         }
2174
2175         spin_lock(&cli->cl_mod_rpcs_lock);
2176
2177         prev = cli->cl_max_mod_rpcs_in_flight;
2178         cli->cl_max_mod_rpcs_in_flight = max;
2179
2180         /* wakeup waiters if limit has been increased */
2181         if (cli->cl_max_mod_rpcs_in_flight > prev)
2182                 wake_up(&cli->cl_mod_rpcs_waitq);
2183
2184         spin_unlock(&cli->cl_mod_rpcs_lock);
2185
2186         return 0;
2187 }
2188 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2189
2190 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2191                                struct seq_file *seq)
2192 {
2193         unsigned long mod_tot = 0, mod_cum;
2194         struct timespec64 now;
2195         int i;
2196
2197         ktime_get_real_ts64(&now);
2198
2199         spin_lock(&cli->cl_mod_rpcs_lock);
2200
2201         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2202                    (s64)now.tv_sec, now.tv_nsec);
2203         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2204                    cli->cl_mod_rpcs_in_flight);
2205
2206         seq_printf(seq, "\n\t\t\tmodify\n");
2207         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2208
2209         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2210
2211         mod_cum = 0;
2212         for (i = 0; i < OBD_HIST_MAX; i++) {
2213                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2214                 mod_cum += mod;
2215                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2216                            i, mod, pct(mod, mod_tot),
2217                            pct(mod_cum, mod_tot));
2218                 if (mod_cum == mod_tot)
2219                         break;
2220         }
2221
2222         spin_unlock(&cli->cl_mod_rpcs_lock);
2223
2224         return 0;
2225 }
2226 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2227
2228 /* The number of modify RPCs sent in parallel is limited
2229  * because the server has a finite number of slots per client to
2230  * store request result and ensure reply reconstruction when needed.
2231  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2232  * that takes into account server limit and cl_max_rpcs_in_flight
2233  * value.
2234  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2235  * one close request is allowed above the maximum.
2236  */
2237 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2238                                                  bool close_req)
2239 {
2240         bool avail;
2241
2242         /* A slot is available if
2243          * - number of modify RPCs in flight is less than the max
2244          * - it's a close RPC and no other close request is in flight
2245          */
2246         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2247                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2248
2249         return avail;
2250 }
2251
2252 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2253                                          bool close_req)
2254 {
2255         bool avail;
2256
2257         spin_lock(&cli->cl_mod_rpcs_lock);
2258         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2259         spin_unlock(&cli->cl_mod_rpcs_lock);
2260         return avail;
2261 }
2262
2263
2264 /* Get a modify RPC slot from the obd client @cli according
2265  * to the kind of operation @opc that is going to be sent
2266  * and the intent @it of the operation if it applies.
2267  * If the maximum number of modify RPCs in flight is reached
2268  * the thread is put to sleep.
2269  * Returns the tag to be set in the request message. Tag 0
2270  * is reserved for non-modifying requests.
2271  */
2272 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2273 {
2274         bool                    close_req = false;
2275         __u16                   i, max;
2276
2277         if (opc == MDS_CLOSE)
2278                 close_req = true;
2279
2280         do {
2281                 spin_lock(&cli->cl_mod_rpcs_lock);
2282                 max = cli->cl_max_mod_rpcs_in_flight;
2283                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2284                         /* there is a slot available */
2285                         cli->cl_mod_rpcs_in_flight++;
2286                         if (close_req)
2287                                 cli->cl_close_rpcs_in_flight++;
2288                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2289                                          cli->cl_mod_rpcs_in_flight);
2290                         /* find a free tag */
2291                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2292                                                 max + 1);
2293                         LASSERT(i < OBD_MAX_RIF_MAX);
2294                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2295                         spin_unlock(&cli->cl_mod_rpcs_lock);
2296                         /* tag 0 is reserved for non-modify RPCs */
2297
2298                         CDEBUG(D_RPCTRACE,
2299                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2300                                cli->cl_import->imp_obd->obd_name,
2301                                i + 1, opc, max);
2302
2303                         return i + 1;
2304                 }
2305                 spin_unlock(&cli->cl_mod_rpcs_lock);
2306
2307                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2308                        "opc %u, max %hu\n",
2309                        cli->cl_import->imp_obd->obd_name, opc, max);
2310
2311                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2312                                           obd_mod_rpc_slot_avail(cli,
2313                                                                  close_req));
2314         } while (true);
2315 }
2316 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2317
2318 /* Put a modify RPC slot from the obd client @cli according
2319  * to the kind of operation @opc that has been sent.
2320  */
2321 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2322 {
2323         bool                    close_req = false;
2324
2325         if (tag == 0)
2326                 return;
2327
2328         if (opc == MDS_CLOSE)
2329                 close_req = true;
2330
2331         spin_lock(&cli->cl_mod_rpcs_lock);
2332         cli->cl_mod_rpcs_in_flight--;
2333         if (close_req)
2334                 cli->cl_close_rpcs_in_flight--;
2335         /* release the tag in the bitmap */
2336         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2337         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2338         spin_unlock(&cli->cl_mod_rpcs_lock);
2339         /* LU-14741 - to prevent close RPCs stuck behind normal ones */
2340         if (close_req)
2341                 wake_up_all(&cli->cl_mod_rpcs_waitq);
2342         else
2343                 wake_up(&cli->cl_mod_rpcs_waitq);
2344 }
2345 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2346