Whamcloud - gitweb
LU-1904 idl: add checks for OBD_CONNECT flags
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
50
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
54
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58                               const char *status, int locks, int debug_level);
59
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
63
64 /*
65  * support functions: we could use inter-module communication, but this
66  * is more portable to other OS's
67  */
68 static struct obd_device *obd_device_alloc(void)
69 {
70         struct obd_device *obd;
71
72         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
73         if (obd != NULL) {
74                 obd->obd_magic = OBD_DEVICE_MAGIC;
75         }
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct kobject *kobj = kset_find_obj(lustre_kset, name);
96
97         if (kobj && kobj->ktype == &class_ktype)
98                 return container_of(kobj, struct obd_type, typ_kobj);
99
100         kobject_put(kobj);
101         return NULL;
102 }
103 EXPORT_SYMBOL(class_search_type);
104
105 struct obd_type *class_get_type(const char *name)
106 {
107         struct obd_type *type;
108
109         type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
111         if (!type) {
112                 const char *modname = name;
113
114 #ifdef HAVE_SERVER_SUPPORT
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
124
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 if (try_module_get(type->typ_dt_ops->o_owner)) {
136                         atomic_inc(&type->typ_refcnt);
137                         /* class_search_type() returned a counted reference,
138                          * but we don't need that count any more as
139                          * we have one through typ_refcnt.
140                          */
141                         kobject_put(&type->typ_kobj);
142                 } else {
143                         kobject_put(&type->typ_kobj);
144                         type = NULL;
145                 }
146         }
147         return type;
148 }
149 EXPORT_SYMBOL(class_get_type);
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         module_put(type->typ_dt_ops->o_owner);
155         atomic_dec(&type->typ_refcnt);
156 }
157 EXPORT_SYMBOL(class_put_type);
158
159 static void class_sysfs_release(struct kobject *kobj)
160 {
161         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
162
163         debugfs_remove_recursive(type->typ_debugfs_entry);
164         type->typ_debugfs_entry = NULL;
165
166         if (type->typ_lu)
167                 lu_device_type_fini(type->typ_lu);
168
169 #ifdef CONFIG_PROC_FS
170         if (type->typ_name && type->typ_procroot)
171                 remove_proc_subtree(type->typ_name, proc_lustre_root);
172 #endif
173         OBD_FREE(type, sizeof(*type));
174 }
175
176 static struct kobj_type class_ktype = {
177         .sysfs_ops      = &lustre_sysfs_ops,
178         .release        = class_sysfs_release,
179 };
180
181 #ifdef HAVE_SERVER_SUPPORT
182 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
183 {
184         struct dentry *symlink;
185         struct obd_type *type;
186         int rc;
187
188         type = class_search_type(name);
189         if (type) {
190                 kobject_put(&type->typ_kobj);
191                 return ERR_PTR(-EEXIST);
192         }
193
194         OBD_ALLOC(type, sizeof(*type));
195         if (!type)
196                 return ERR_PTR(-ENOMEM);
197
198         type->typ_kobj.kset = lustre_kset;
199         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
200                                   &lustre_kset->kobj, "%s", name);
201         if (rc)
202                 return ERR_PTR(rc);
203
204         symlink = debugfs_create_dir(name, debugfs_lustre_root);
205         type->typ_debugfs_entry = symlink;
206         type->typ_sym_filter = true;
207
208         if (enable_proc) {
209                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
210                                                       NULL, NULL);
211                 if (IS_ERR(type->typ_procroot)) {
212                         CERROR("%s: can't create compat proc entry: %d\n",
213                                name, (int)PTR_ERR(type->typ_procroot));
214                         type->typ_procroot = NULL;
215                 }
216         }
217
218         return type;
219 }
220 EXPORT_SYMBOL(class_add_symlinks);
221 #endif /* HAVE_SERVER_SUPPORT */
222
223 #define CLASS_MAX_NAME 1024
224
225 int class_register_type(const struct obd_ops *dt_ops,
226                         const struct md_ops *md_ops,
227                         bool enable_proc,
228                         const char *name, struct lu_device_type *ldt)
229 {
230         struct obd_type *type;
231         int rc;
232
233         ENTRY;
234         /* sanity check */
235         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236
237         type = class_search_type(name);
238         if (type) {
239 #ifdef HAVE_SERVER_SUPPORT
240                 if (type->typ_sym_filter)
241                         goto dir_exist;
242 #endif /* HAVE_SERVER_SUPPORT */
243                 kobject_put(&type->typ_kobj);
244                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
245                 RETURN(-EEXIST);
246         }
247
248         OBD_ALLOC(type, sizeof(*type));
249         if (type == NULL)
250                 RETURN(-ENOMEM);
251
252         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
253         type->typ_kobj.kset = lustre_kset;
254         kobject_init(&type->typ_kobj, &class_ktype);
255 #ifdef HAVE_SERVER_SUPPORT
256 dir_exist:
257 #endif /* HAVE_SERVER_SUPPORT */
258
259         type->typ_dt_ops = dt_ops;
260         type->typ_md_ops = md_ops;
261
262 #ifdef HAVE_SERVER_SUPPORT
263         if (type->typ_sym_filter) {
264                 type->typ_sym_filter = false;
265                 kobject_put(&type->typ_kobj);
266                 goto setup_ldt;
267         }
268 #endif
269 #ifdef CONFIG_PROC_FS
270         if (enable_proc && !type->typ_procroot) {
271                 type->typ_procroot = lprocfs_register(name,
272                                                       proc_lustre_root,
273                                                       NULL, type);
274                 if (IS_ERR(type->typ_procroot)) {
275                         rc = PTR_ERR(type->typ_procroot);
276                         type->typ_procroot = NULL;
277                         GOTO(failed, rc);
278                 }
279         }
280 #endif
281         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
282
283         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284         if (rc)
285                 GOTO(failed, rc);
286 #ifdef HAVE_SERVER_SUPPORT
287 setup_ldt:
288 #endif
289         if (ldt) {
290                 rc = lu_device_type_init(ldt);
291                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292                 wake_up_var(&type->typ_lu);
293                 if (rc)
294                         GOTO(failed, rc);
295         }
296
297         RETURN(0);
298
299 failed:
300         kobject_put(&type->typ_kobj);
301
302         RETURN(rc);
303 }
304 EXPORT_SYMBOL(class_register_type);
305
306 int class_unregister_type(const char *name)
307 {
308         struct obd_type *type = class_search_type(name);
309         int rc = 0;
310         ENTRY;
311
312         if (!type) {
313                 CERROR("unknown obd type\n");
314                 RETURN(-EINVAL);
315         }
316
317         if (atomic_read(&type->typ_refcnt)) {
318                 CERROR("type %s has refcount (%d)\n", name,
319                        atomic_read(&type->typ_refcnt));
320                 /* This is a bad situation, let's make the best of it */
321                 /* Remove ops, but leave the name for debugging */
322                 type->typ_dt_ops = NULL;
323                 type->typ_md_ops = NULL;
324                 GOTO(out_put, rc = -EBUSY);
325         }
326
327         /* Put the final ref */
328         kobject_put(&type->typ_kobj);
329 out_put:
330         /* Put the ref returned by class_search_type() */
331         kobject_put(&type->typ_kobj);
332
333         RETURN(rc);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
336
337 /**
338  * Create a new obd device.
339  *
340  * Allocate the new obd_device and initialize it.
341  *
342  * \param[in] type_name obd device type string.
343  * \param[in] name      obd device name.
344  * \param[in] uuid      obd device UUID
345  *
346  * \retval newdev         pointer to created obd_device
347  * \retval ERR_PTR(errno) on error
348  */
349 struct obd_device *class_newdev(const char *type_name, const char *name,
350                                 const char *uuid)
351 {
352         struct obd_device *newdev;
353         struct obd_type *type = NULL;
354         ENTRY;
355
356         if (strlen(name) >= MAX_OBD_NAME) {
357                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358                 RETURN(ERR_PTR(-EINVAL));
359         }
360
361         type = class_get_type(type_name);
362         if (type == NULL){
363                 CERROR("OBD: unknown type: %s\n", type_name);
364                 RETURN(ERR_PTR(-ENODEV));
365         }
366
367         newdev = obd_device_alloc();
368         if (newdev == NULL) {
369                 class_put_type(type);
370                 RETURN(ERR_PTR(-ENOMEM));
371         }
372         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374         newdev->obd_type = type;
375         newdev->obd_minor = -1;
376
377         rwlock_init(&newdev->obd_pool_lock);
378         newdev->obd_pool_limit = 0;
379         newdev->obd_pool_slv = 0;
380
381         INIT_LIST_HEAD(&newdev->obd_exports);
382         newdev->obd_num_exports = 0;
383         newdev->obd_grant_check_threshold = 100;
384         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
385         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
386         INIT_LIST_HEAD(&newdev->obd_exports_timed);
387         INIT_LIST_HEAD(&newdev->obd_nid_stats);
388         spin_lock_init(&newdev->obd_nid_lock);
389         spin_lock_init(&newdev->obd_dev_lock);
390         mutex_init(&newdev->obd_dev_mutex);
391         spin_lock_init(&newdev->obd_osfs_lock);
392         /* newdev->obd_osfs_age must be set to a value in the distant
393          * past to guarantee a fresh statfs is fetched on mount. */
394         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
395
396         /* XXX belongs in setup not attach  */
397         init_rwsem(&newdev->obd_observer_link_sem);
398         /* recovery data */
399         spin_lock_init(&newdev->obd_recovery_task_lock);
400         init_waitqueue_head(&newdev->obd_next_transno_waitq);
401         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
402         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
403         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
404         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
405         INIT_LIST_HEAD(&newdev->obd_evict_list);
406         INIT_LIST_HEAD(&newdev->obd_lwp_list);
407
408         llog_group_init(&newdev->obd_olg);
409         /* Detach drops this */
410         atomic_set(&newdev->obd_refcount, 1);
411         lu_ref_init(&newdev->obd_reference);
412         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
413
414         newdev->obd_conn_inprogress = 0;
415
416         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
417
418         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
419                newdev->obd_name, newdev);
420
421         return newdev;
422 }
423
424 /**
425  * Free obd device.
426  *
427  * \param[in] obd obd_device to be freed
428  *
429  * \retval none
430  */
431 void class_free_dev(struct obd_device *obd)
432 {
433         struct obd_type *obd_type = obd->obd_type;
434
435         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
436                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
437         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
438                  "obd %p != obd_devs[%d] %p\n",
439                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
440         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
441                  "obd_refcount should be 0, not %d\n",
442                  atomic_read(&obd->obd_refcount));
443         LASSERT(obd_type != NULL);
444
445         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
446                obd->obd_name, obd->obd_type->typ_name);
447
448         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
449                          obd->obd_name, obd->obd_uuid.uuid);
450         if (obd->obd_stopping) {
451                 int err;
452
453                 /* If we're not stopping, we were never set up */
454                 err = obd_cleanup(obd);
455                 if (err)
456                         CERROR("Cleanup %s returned %d\n",
457                                 obd->obd_name, err);
458         }
459
460         obd_device_free(obd);
461
462         class_put_type(obd_type);
463 }
464
465 /**
466  * Unregister obd device.
467  *
468  * Free slot in obd_dev[] used by \a obd.
469  *
470  * \param[in] new_obd obd_device to be unregistered
471  *
472  * \retval none
473  */
474 void class_unregister_device(struct obd_device *obd)
475 {
476         write_lock(&obd_dev_lock);
477         if (obd->obd_minor >= 0) {
478                 LASSERT(obd_devs[obd->obd_minor] == obd);
479                 obd_devs[obd->obd_minor] = NULL;
480                 obd->obd_minor = -1;
481         }
482         write_unlock(&obd_dev_lock);
483 }
484
485 /**
486  * Register obd device.
487  *
488  * Find free slot in obd_devs[], fills it with \a new_obd.
489  *
490  * \param[in] new_obd obd_device to be registered
491  *
492  * \retval 0          success
493  * \retval -EEXIST    device with this name is registered
494  * \retval -EOVERFLOW obd_devs[] is full
495  */
496 int class_register_device(struct obd_device *new_obd)
497 {
498         int ret = 0;
499         int i;
500         int new_obd_minor = 0;
501         bool minor_assign = false;
502         bool retried = false;
503
504 again:
505         write_lock(&obd_dev_lock);
506         for (i = 0; i < class_devno_max(); i++) {
507                 struct obd_device *obd = class_num2obd(i);
508
509                 if (obd != NULL &&
510                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
511
512                         if (!retried) {
513                                 write_unlock(&obd_dev_lock);
514
515                                 /* the obd_device could be waited to be
516                                  * destroyed by the "obd_zombie_impexp_thread".
517                                  */
518                                 obd_zombie_barrier();
519                                 retried = true;
520                                 goto again;
521                         }
522
523                         CERROR("%s: already exists, won't add\n",
524                                obd->obd_name);
525                         /* in case we found a free slot before duplicate */
526                         minor_assign = false;
527                         ret = -EEXIST;
528                         break;
529                 }
530                 if (!minor_assign && obd == NULL) {
531                         new_obd_minor = i;
532                         minor_assign = true;
533                 }
534         }
535
536         if (minor_assign) {
537                 new_obd->obd_minor = new_obd_minor;
538                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
539                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
540                 obd_devs[new_obd_minor] = new_obd;
541         } else {
542                 if (ret == 0) {
543                         ret = -EOVERFLOW;
544                         CERROR("%s: all %u/%u devices used, increase "
545                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
546                                i, class_devno_max(), ret);
547                 }
548         }
549         write_unlock(&obd_dev_lock);
550
551         RETURN(ret);
552 }
553
554 static int class_name2dev_nolock(const char *name)
555 {
556         int i;
557
558         if (!name)
559                 return -1;
560
561         for (i = 0; i < class_devno_max(); i++) {
562                 struct obd_device *obd = class_num2obd(i);
563
564                 if (obd && strcmp(name, obd->obd_name) == 0) {
565                         /* Make sure we finished attaching before we give
566                            out any references */
567                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
568                         if (obd->obd_attached) {
569                                 return i;
570                         }
571                         break;
572                 }
573         }
574
575         return -1;
576 }
577
578 int class_name2dev(const char *name)
579 {
580         int i;
581
582         if (!name)
583                 return -1;
584
585         read_lock(&obd_dev_lock);
586         i = class_name2dev_nolock(name);
587         read_unlock(&obd_dev_lock);
588
589         return i;
590 }
591 EXPORT_SYMBOL(class_name2dev);
592
593 struct obd_device *class_name2obd(const char *name)
594 {
595         int dev = class_name2dev(name);
596
597         if (dev < 0 || dev > class_devno_max())
598                 return NULL;
599         return class_num2obd(dev);
600 }
601 EXPORT_SYMBOL(class_name2obd);
602
603 int class_uuid2dev_nolock(struct obd_uuid *uuid)
604 {
605         int i;
606
607         for (i = 0; i < class_devno_max(); i++) {
608                 struct obd_device *obd = class_num2obd(i);
609
610                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
611                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
612                         return i;
613                 }
614         }
615
616         return -1;
617 }
618
619 int class_uuid2dev(struct obd_uuid *uuid)
620 {
621         int i;
622
623         read_lock(&obd_dev_lock);
624         i = class_uuid2dev_nolock(uuid);
625         read_unlock(&obd_dev_lock);
626
627         return i;
628 }
629 EXPORT_SYMBOL(class_uuid2dev);
630
631 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
632 {
633         int dev = class_uuid2dev(uuid);
634         if (dev < 0)
635                 return NULL;
636         return class_num2obd(dev);
637 }
638 EXPORT_SYMBOL(class_uuid2obd);
639
640 /**
641  * Get obd device from ::obd_devs[]
642  *
643  * \param num [in] array index
644  *
645  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
646  *         otherwise return the obd device there.
647  */
648 struct obd_device *class_num2obd(int num)
649 {
650         struct obd_device *obd = NULL;
651
652         if (num < class_devno_max()) {
653                 obd = obd_devs[num];
654                 if (obd == NULL)
655                         return NULL;
656
657                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
658                          "%p obd_magic %08x != %08x\n",
659                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
660                 LASSERTF(obd->obd_minor == num,
661                          "%p obd_minor %0d != %0d\n",
662                          obd, obd->obd_minor, num);
663         }
664
665         return obd;
666 }
667 EXPORT_SYMBOL(class_num2obd);
668
669 /**
670  * Find obd in obd_dev[] by name or uuid.
671  *
672  * Increment obd's refcount if found.
673  *
674  * \param[in] str obd name or uuid
675  *
676  * \retval NULL    if not found
677  * \retval target  pointer to found obd_device
678  */
679 struct obd_device *class_dev_by_str(const char *str)
680 {
681         struct obd_device *target = NULL;
682         struct obd_uuid tgtuuid;
683         int rc;
684
685         obd_str2uuid(&tgtuuid, str);
686
687         read_lock(&obd_dev_lock);
688         rc = class_uuid2dev_nolock(&tgtuuid);
689         if (rc < 0)
690                 rc = class_name2dev_nolock(str);
691
692         if (rc >= 0)
693                 target = class_num2obd(rc);
694
695         if (target != NULL)
696                 class_incref(target, "find", current);
697         read_unlock(&obd_dev_lock);
698
699         RETURN(target);
700 }
701 EXPORT_SYMBOL(class_dev_by_str);
702
703 /**
704  * Get obd devices count. Device in any
705  *    state are counted
706  * \retval obd device count
707  */
708 int get_devices_count(void)
709 {
710         int index, max_index = class_devno_max(), dev_count = 0;
711
712         read_lock(&obd_dev_lock);
713         for (index = 0; index <= max_index; index++) {
714                 struct obd_device *obd = class_num2obd(index);
715                 if (obd != NULL)
716                         dev_count++;
717         }
718         read_unlock(&obd_dev_lock);
719
720         return dev_count;
721 }
722 EXPORT_SYMBOL(get_devices_count);
723
724 void class_obd_list(void)
725 {
726         char *status;
727         int i;
728
729         read_lock(&obd_dev_lock);
730         for (i = 0; i < class_devno_max(); i++) {
731                 struct obd_device *obd = class_num2obd(i);
732
733                 if (obd == NULL)
734                         continue;
735                 if (obd->obd_stopping)
736                         status = "ST";
737                 else if (obd->obd_set_up)
738                         status = "UP";
739                 else if (obd->obd_attached)
740                         status = "AT";
741                 else
742                         status = "--";
743                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
744                          i, status, obd->obd_type->typ_name,
745                          obd->obd_name, obd->obd_uuid.uuid,
746                          atomic_read(&obd->obd_refcount));
747         }
748         read_unlock(&obd_dev_lock);
749 }
750
751 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
752  * specified, then only the client with that uuid is returned,
753  * otherwise any client connected to the tgt is returned.
754  */
755 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
756                                          const char *type_name,
757                                          struct obd_uuid *grp_uuid)
758 {
759         int i;
760
761         read_lock(&obd_dev_lock);
762         for (i = 0; i < class_devno_max(); i++) {
763                 struct obd_device *obd = class_num2obd(i);
764
765                 if (obd == NULL)
766                         continue;
767                 if ((strncmp(obd->obd_type->typ_name, type_name,
768                              strlen(type_name)) == 0)) {
769                         if (obd_uuid_equals(tgt_uuid,
770                                             &obd->u.cli.cl_target_uuid) &&
771                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
772                                                          &obd->obd_uuid) : 1)) {
773                                 read_unlock(&obd_dev_lock);
774                                 return obd;
775                         }
776                 }
777         }
778         read_unlock(&obd_dev_lock);
779
780         return NULL;
781 }
782 EXPORT_SYMBOL(class_find_client_obd);
783
784 /* Iterate the obd_device list looking devices have grp_uuid. Start
785  * searching at *next, and if a device is found, the next index to look
786  * at is saved in *next. If next is NULL, then the first matching device
787  * will always be returned.
788  */
789 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
790 {
791         int i;
792
793         if (next == NULL)
794                 i = 0;
795         else if (*next >= 0 && *next < class_devno_max())
796                 i = *next;
797         else
798                 return NULL;
799
800         read_lock(&obd_dev_lock);
801         for (; i < class_devno_max(); i++) {
802                 struct obd_device *obd = class_num2obd(i);
803
804                 if (obd == NULL)
805                         continue;
806                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
807                         if (next != NULL)
808                                 *next = i+1;
809                         read_unlock(&obd_dev_lock);
810                         return obd;
811                 }
812         }
813         read_unlock(&obd_dev_lock);
814
815         return NULL;
816 }
817 EXPORT_SYMBOL(class_devices_in_group);
818
819 /**
820  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
821  * adjust sptlrpc settings accordingly.
822  */
823 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
824 {
825         struct obd_device  *obd;
826         const char         *type;
827         int                 i, rc = 0, rc2;
828
829         LASSERT(namelen > 0);
830
831         read_lock(&obd_dev_lock);
832         for (i = 0; i < class_devno_max(); i++) {
833                 obd = class_num2obd(i);
834
835                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
836                         continue;
837
838                 /* only notify mdc, osc, osp, lwp, mdt, ost
839                  * because only these have a -sptlrpc llog */
840                 type = obd->obd_type->typ_name;
841                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
842                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
843                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
844                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
845                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
846                     strcmp(type, LUSTRE_OST_NAME) != 0)
847                         continue;
848
849                 if (strncmp(obd->obd_name, fsname, namelen))
850                         continue;
851
852                 class_incref(obd, __FUNCTION__, obd);
853                 read_unlock(&obd_dev_lock);
854                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
855                                          sizeof(KEY_SPTLRPC_CONF),
856                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
857                 rc = rc ? rc : rc2;
858                 class_decref(obd, __FUNCTION__, obd);
859                 read_lock(&obd_dev_lock);
860         }
861         read_unlock(&obd_dev_lock);
862         return rc;
863 }
864 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
865
866 void obd_cleanup_caches(void)
867 {
868         ENTRY;
869         if (obd_device_cachep) {
870                 kmem_cache_destroy(obd_device_cachep);
871                 obd_device_cachep = NULL;
872         }
873
874         EXIT;
875 }
876
877 int obd_init_caches(void)
878 {
879         int rc;
880         ENTRY;
881
882         LASSERT(obd_device_cachep == NULL);
883         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
884                                 sizeof(struct obd_device),
885                                 0, 0, 0, sizeof(struct obd_device), NULL);
886         if (!obd_device_cachep)
887                 GOTO(out, rc = -ENOMEM);
888
889         RETURN(0);
890 out:
891         obd_cleanup_caches();
892         RETURN(rc);
893 }
894
895 static const char export_handle_owner[] = "export";
896
897 /* map connection to client */
898 struct obd_export *class_conn2export(struct lustre_handle *conn)
899 {
900         struct obd_export *export;
901         ENTRY;
902
903         if (!conn) {
904                 CDEBUG(D_CACHE, "looking for null handle\n");
905                 RETURN(NULL);
906         }
907
908         if (conn->cookie == -1) {  /* this means assign a new connection */
909                 CDEBUG(D_CACHE, "want a new connection\n");
910                 RETURN(NULL);
911         }
912
913         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
914         export = class_handle2object(conn->cookie, export_handle_owner);
915         RETURN(export);
916 }
917 EXPORT_SYMBOL(class_conn2export);
918
919 struct obd_device *class_exp2obd(struct obd_export *exp)
920 {
921         if (exp)
922                 return exp->exp_obd;
923         return NULL;
924 }
925 EXPORT_SYMBOL(class_exp2obd);
926
927 struct obd_import *class_exp2cliimp(struct obd_export *exp)
928 {
929         struct obd_device *obd = exp->exp_obd;
930         if (obd == NULL)
931                 return NULL;
932         return obd->u.cli.cl_import;
933 }
934 EXPORT_SYMBOL(class_exp2cliimp);
935
936 /* Export management functions */
937 static void class_export_destroy(struct obd_export *exp)
938 {
939         struct obd_device *obd = exp->exp_obd;
940         ENTRY;
941
942         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
943         LASSERT(obd != NULL);
944
945         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
946                exp->exp_client_uuid.uuid, obd->obd_name);
947
948         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
949         ptlrpc_connection_put(exp->exp_connection);
950
951         LASSERT(list_empty(&exp->exp_outstanding_replies));
952         LASSERT(list_empty(&exp->exp_uncommitted_replies));
953         LASSERT(list_empty(&exp->exp_req_replay_queue));
954         LASSERT(list_empty(&exp->exp_hp_rpcs));
955         obd_destroy_export(exp);
956         /* self export doesn't hold a reference to an obd, although it
957          * exists until freeing of the obd */
958         if (exp != obd->obd_self_export)
959                 class_decref(obd, "export", exp);
960
961         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
962         kfree_rcu(exp, exp_handle.h_rcu);
963         EXIT;
964 }
965
966 struct obd_export *class_export_get(struct obd_export *exp)
967 {
968         refcount_inc(&exp->exp_handle.h_ref);
969         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
970                refcount_read(&exp->exp_handle.h_ref));
971         return exp;
972 }
973 EXPORT_SYMBOL(class_export_get);
974
975 void class_export_put(struct obd_export *exp)
976 {
977         LASSERT(exp != NULL);
978         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
979         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
980         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
981                refcount_read(&exp->exp_handle.h_ref) - 1);
982
983         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
984                 struct obd_device *obd = exp->exp_obd;
985
986                 CDEBUG(D_IOCTL, "final put %p/%s\n",
987                        exp, exp->exp_client_uuid.uuid);
988
989                 /* release nid stat refererence */
990                 lprocfs_exp_cleanup(exp);
991
992                 if (exp == obd->obd_self_export) {
993                         /* self export should be destroyed without
994                          * zombie thread as it doesn't hold a
995                          * reference to obd and doesn't hold any
996                          * resources */
997                         class_export_destroy(exp);
998                         /* self export is destroyed, no class
999                          * references exist and it is safe to free
1000                          * obd */
1001                         class_free_dev(obd);
1002                 } else {
1003                         LASSERT(!list_empty(&exp->exp_obd_chain));
1004                         obd_zombie_export_add(exp);
1005                 }
1006
1007         }
1008 }
1009 EXPORT_SYMBOL(class_export_put);
1010
1011 static void obd_zombie_exp_cull(struct work_struct *ws)
1012 {
1013         struct obd_export *export;
1014
1015         export = container_of(ws, struct obd_export, exp_zombie_work);
1016         class_export_destroy(export);
1017 }
1018
1019 /* Creates a new export, adds it to the hash table, and returns a
1020  * pointer to it. The refcount is 2: one for the hash reference, and
1021  * one for the pointer returned by this function. */
1022 struct obd_export *__class_new_export(struct obd_device *obd,
1023                                       struct obd_uuid *cluuid, bool is_self)
1024 {
1025         struct obd_export *export;
1026         int rc = 0;
1027         ENTRY;
1028
1029         OBD_ALLOC_PTR(export);
1030         if (!export)
1031                 return ERR_PTR(-ENOMEM);
1032
1033         export->exp_conn_cnt = 0;
1034         export->exp_lock_hash = NULL;
1035         export->exp_flock_hash = NULL;
1036         /* 2 = class_handle_hash + last */
1037         refcount_set(&export->exp_handle.h_ref, 2);
1038         atomic_set(&export->exp_rpc_count, 0);
1039         atomic_set(&export->exp_cb_count, 0);
1040         atomic_set(&export->exp_locks_count, 0);
1041 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1042         INIT_LIST_HEAD(&export->exp_locks_list);
1043         spin_lock_init(&export->exp_locks_list_guard);
1044 #endif
1045         atomic_set(&export->exp_replay_count, 0);
1046         export->exp_obd = obd;
1047         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1048         spin_lock_init(&export->exp_uncommitted_replies_lock);
1049         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1050         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1051         INIT_HLIST_NODE(&export->exp_handle.h_link);
1052         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1053         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1054         class_handle_hash(&export->exp_handle, export_handle_owner);
1055         export->exp_last_request_time = ktime_get_real_seconds();
1056         spin_lock_init(&export->exp_lock);
1057         spin_lock_init(&export->exp_rpc_lock);
1058         INIT_HLIST_NODE(&export->exp_gen_hash);
1059         spin_lock_init(&export->exp_bl_list_lock);
1060         INIT_LIST_HEAD(&export->exp_bl_list);
1061         INIT_LIST_HEAD(&export->exp_stale_list);
1062         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1063
1064         export->exp_sp_peer = LUSTRE_SP_ANY;
1065         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1066         export->exp_client_uuid = *cluuid;
1067         obd_init_export(export);
1068
1069         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1070
1071         spin_lock(&obd->obd_dev_lock);
1072         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1073                 /* shouldn't happen, but might race */
1074                 if (obd->obd_stopping)
1075                         GOTO(exit_unlock, rc = -ENODEV);
1076
1077                 rc = obd_uuid_add(obd, export);
1078                 if (rc != 0) {
1079                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1080                                       obd->obd_name, cluuid->uuid, rc);
1081                         GOTO(exit_unlock, rc = -EALREADY);
1082                 }
1083         }
1084
1085         if (!is_self) {
1086                 class_incref(obd, "export", export);
1087                 list_add_tail(&export->exp_obd_chain_timed,
1088                               &obd->obd_exports_timed);
1089                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1090                 obd->obd_num_exports++;
1091         } else {
1092                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1093                 INIT_LIST_HEAD(&export->exp_obd_chain);
1094         }
1095         spin_unlock(&obd->obd_dev_lock);
1096         RETURN(export);
1097
1098 exit_unlock:
1099         spin_unlock(&obd->obd_dev_lock);
1100         class_handle_unhash(&export->exp_handle);
1101         obd_destroy_export(export);
1102         OBD_FREE_PTR(export);
1103         return ERR_PTR(rc);
1104 }
1105
1106 struct obd_export *class_new_export(struct obd_device *obd,
1107                                     struct obd_uuid *uuid)
1108 {
1109         return __class_new_export(obd, uuid, false);
1110 }
1111 EXPORT_SYMBOL(class_new_export);
1112
1113 struct obd_export *class_new_export_self(struct obd_device *obd,
1114                                          struct obd_uuid *uuid)
1115 {
1116         return __class_new_export(obd, uuid, true);
1117 }
1118
1119 void class_unlink_export(struct obd_export *exp)
1120 {
1121         class_handle_unhash(&exp->exp_handle);
1122
1123         if (exp->exp_obd->obd_self_export == exp) {
1124                 class_export_put(exp);
1125                 return;
1126         }
1127
1128         spin_lock(&exp->exp_obd->obd_dev_lock);
1129         /* delete an uuid-export hashitem from hashtables */
1130         if (exp != exp->exp_obd->obd_self_export)
1131                 obd_uuid_del(exp->exp_obd, exp);
1132
1133 #ifdef HAVE_SERVER_SUPPORT
1134         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1135                 struct tg_export_data   *ted = &exp->exp_target_data;
1136                 struct cfs_hash         *hash;
1137
1138                 /* Because obd_gen_hash will not be released until
1139                  * class_cleanup(), so hash should never be NULL here */
1140                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1141                 LASSERT(hash != NULL);
1142                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1143                              &exp->exp_gen_hash);
1144                 cfs_hash_putref(hash);
1145         }
1146 #endif /* HAVE_SERVER_SUPPORT */
1147
1148         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1149         list_del_init(&exp->exp_obd_chain_timed);
1150         exp->exp_obd->obd_num_exports--;
1151         spin_unlock(&exp->exp_obd->obd_dev_lock);
1152         atomic_inc(&obd_stale_export_num);
1153
1154         /* A reference is kept by obd_stale_exports list */
1155         obd_stale_export_put(exp);
1156 }
1157 EXPORT_SYMBOL(class_unlink_export);
1158
1159 /* Import management functions */
1160 static void obd_zombie_import_free(struct obd_import *imp)
1161 {
1162         struct obd_import_conn *imp_conn;
1163
1164         ENTRY;
1165         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1166                imp->imp_obd->obd_name);
1167
1168         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1169
1170         ptlrpc_connection_put(imp->imp_connection);
1171
1172         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1173                                                     struct obd_import_conn,
1174                                                     oic_item)) != NULL) {
1175                 list_del_init(&imp_conn->oic_item);
1176                 ptlrpc_connection_put(imp_conn->oic_conn);
1177                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1178         }
1179
1180         LASSERT(imp->imp_sec == NULL);
1181         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1182                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1183         class_decref(imp->imp_obd, "import", imp);
1184         OBD_FREE_PTR(imp);
1185         EXIT;
1186 }
1187
1188 struct obd_import *class_import_get(struct obd_import *import)
1189 {
1190         refcount_inc(&import->imp_refcount);
1191         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1192                refcount_read(&import->imp_refcount),
1193                import->imp_obd->obd_name);
1194         return import;
1195 }
1196 EXPORT_SYMBOL(class_import_get);
1197
1198 void class_import_put(struct obd_import *imp)
1199 {
1200         ENTRY;
1201
1202         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1203
1204         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1205                refcount_read(&imp->imp_refcount) - 1,
1206                imp->imp_obd->obd_name);
1207
1208         if (refcount_dec_and_test(&imp->imp_refcount)) {
1209                 CDEBUG(D_INFO, "final put import %p\n", imp);
1210                 obd_zombie_import_add(imp);
1211         }
1212
1213         EXIT;
1214 }
1215 EXPORT_SYMBOL(class_import_put);
1216
1217 static void init_imp_at(struct imp_at *at) {
1218         int i;
1219         at_init(&at->iat_net_latency, 0, 0);
1220         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1221                 /* max service estimates are tracked on the server side, so
1222                    don't use the AT history here, just use the last reported
1223                    val. (But keep hist for proc histogram, worst_ever) */
1224                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1225                         AT_FLG_NOHIST);
1226         }
1227 }
1228
1229 static void obd_zombie_imp_cull(struct work_struct *ws)
1230 {
1231         struct obd_import *import;
1232
1233         import = container_of(ws, struct obd_import, imp_zombie_work);
1234         obd_zombie_import_free(import);
1235 }
1236
1237 struct obd_import *class_new_import(struct obd_device *obd)
1238 {
1239         struct obd_import *imp;
1240         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1241
1242         OBD_ALLOC(imp, sizeof(*imp));
1243         if (imp == NULL)
1244                 return NULL;
1245
1246         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1247         INIT_LIST_HEAD(&imp->imp_replay_list);
1248         INIT_LIST_HEAD(&imp->imp_sending_list);
1249         INIT_LIST_HEAD(&imp->imp_delayed_list);
1250         INIT_LIST_HEAD(&imp->imp_committed_list);
1251         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1252         imp->imp_known_replied_xid = 0;
1253         imp->imp_replay_cursor = &imp->imp_committed_list;
1254         spin_lock_init(&imp->imp_lock);
1255         imp->imp_last_success_conn = 0;
1256         imp->imp_state = LUSTRE_IMP_NEW;
1257         imp->imp_obd = class_incref(obd, "import", imp);
1258         rwlock_init(&imp->imp_sec_lock);
1259         init_waitqueue_head(&imp->imp_recovery_waitq);
1260         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1261
1262         if (curr_pid_ns && curr_pid_ns->child_reaper)
1263                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1264         else
1265                 imp->imp_sec_refpid = 1;
1266
1267         refcount_set(&imp->imp_refcount, 2);
1268         atomic_set(&imp->imp_unregistering, 0);
1269         atomic_set(&imp->imp_reqs, 0);
1270         atomic_set(&imp->imp_inflight, 0);
1271         atomic_set(&imp->imp_replay_inflight, 0);
1272         init_waitqueue_head(&imp->imp_replay_waitq);
1273         atomic_set(&imp->imp_inval_count, 0);
1274         INIT_LIST_HEAD(&imp->imp_conn_list);
1275         init_imp_at(&imp->imp_at);
1276
1277         /* the default magic is V2, will be used in connect RPC, and
1278          * then adjusted according to the flags in request/reply. */
1279         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1280
1281         return imp;
1282 }
1283 EXPORT_SYMBOL(class_new_import);
1284
1285 void class_destroy_import(struct obd_import *import)
1286 {
1287         LASSERT(import != NULL);
1288         LASSERT(import != LP_POISON);
1289
1290         spin_lock(&import->imp_lock);
1291         import->imp_generation++;
1292         spin_unlock(&import->imp_lock);
1293         class_import_put(import);
1294 }
1295 EXPORT_SYMBOL(class_destroy_import);
1296
1297 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1298
1299 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1300 {
1301         spin_lock(&exp->exp_locks_list_guard);
1302
1303         LASSERT(lock->l_exp_refs_nr >= 0);
1304
1305         if (lock->l_exp_refs_target != NULL &&
1306             lock->l_exp_refs_target != exp) {
1307                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1308                               exp, lock, lock->l_exp_refs_target);
1309         }
1310         if ((lock->l_exp_refs_nr ++) == 0) {
1311                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1312                 lock->l_exp_refs_target = exp;
1313         }
1314         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1315                lock, exp, lock->l_exp_refs_nr);
1316         spin_unlock(&exp->exp_locks_list_guard);
1317 }
1318 EXPORT_SYMBOL(__class_export_add_lock_ref);
1319
1320 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1321 {
1322         spin_lock(&exp->exp_locks_list_guard);
1323         LASSERT(lock->l_exp_refs_nr > 0);
1324         if (lock->l_exp_refs_target != exp) {
1325                 LCONSOLE_WARN("lock %p, "
1326                               "mismatching export pointers: %p, %p\n",
1327                               lock, lock->l_exp_refs_target, exp);
1328         }
1329         if (-- lock->l_exp_refs_nr == 0) {
1330                 list_del_init(&lock->l_exp_refs_link);
1331                 lock->l_exp_refs_target = NULL;
1332         }
1333         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1334                lock, exp, lock->l_exp_refs_nr);
1335         spin_unlock(&exp->exp_locks_list_guard);
1336 }
1337 EXPORT_SYMBOL(__class_export_del_lock_ref);
1338 #endif
1339
1340 /* A connection defines an export context in which preallocation can
1341    be managed. This releases the export pointer reference, and returns
1342    the export handle, so the export refcount is 1 when this function
1343    returns. */
1344 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1345                   struct obd_uuid *cluuid)
1346 {
1347         struct obd_export *export;
1348         LASSERT(conn != NULL);
1349         LASSERT(obd != NULL);
1350         LASSERT(cluuid != NULL);
1351         ENTRY;
1352
1353         export = class_new_export(obd, cluuid);
1354         if (IS_ERR(export))
1355                 RETURN(PTR_ERR(export));
1356
1357         conn->cookie = export->exp_handle.h_cookie;
1358         class_export_put(export);
1359
1360         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1361                cluuid->uuid, conn->cookie);
1362         RETURN(0);
1363 }
1364 EXPORT_SYMBOL(class_connect);
1365
1366 /* if export is involved in recovery then clean up related things */
1367 static void class_export_recovery_cleanup(struct obd_export *exp)
1368 {
1369         struct obd_device *obd = exp->exp_obd;
1370
1371         spin_lock(&obd->obd_recovery_task_lock);
1372         if (obd->obd_recovering) {
1373                 if (exp->exp_in_recovery) {
1374                         spin_lock(&exp->exp_lock);
1375                         exp->exp_in_recovery = 0;
1376                         spin_unlock(&exp->exp_lock);
1377                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1378                         atomic_dec(&obd->obd_connected_clients);
1379                 }
1380
1381                 /* if called during recovery then should update
1382                  * obd_stale_clients counter,
1383                  * lightweight exports are not counted */
1384                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1385                         exp->exp_obd->obd_stale_clients++;
1386         }
1387         spin_unlock(&obd->obd_recovery_task_lock);
1388
1389         spin_lock(&exp->exp_lock);
1390         /** Cleanup req replay fields */
1391         if (exp->exp_req_replay_needed) {
1392                 exp->exp_req_replay_needed = 0;
1393
1394                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1395                 atomic_dec(&obd->obd_req_replay_clients);
1396         }
1397
1398         /** Cleanup lock replay data */
1399         if (exp->exp_lock_replay_needed) {
1400                 exp->exp_lock_replay_needed = 0;
1401
1402                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1403                 atomic_dec(&obd->obd_lock_replay_clients);
1404         }
1405         spin_unlock(&exp->exp_lock);
1406 }
1407
1408 /* This function removes 1-3 references from the export:
1409  * 1 - for export pointer passed
1410  * and if disconnect really need
1411  * 2 - removing from hash
1412  * 3 - in client_unlink_export
1413  * The export pointer passed to this function can destroyed */
1414 int class_disconnect(struct obd_export *export)
1415 {
1416         int already_disconnected;
1417         ENTRY;
1418
1419         if (export == NULL) {
1420                 CWARN("attempting to free NULL export %p\n", export);
1421                 RETURN(-EINVAL);
1422         }
1423
1424         spin_lock(&export->exp_lock);
1425         already_disconnected = export->exp_disconnected;
1426         export->exp_disconnected = 1;
1427 #ifdef HAVE_SERVER_SUPPORT
1428         /*  We hold references of export for uuid hash
1429          *  and nid_hash and export link at least. So
1430          *  it is safe to call rh*table_remove_fast in
1431          *  there.
1432          */
1433         obd_nid_del(export->exp_obd, export);
1434 #endif /* HAVE_SERVER_SUPPORT */
1435         spin_unlock(&export->exp_lock);
1436
1437         /* class_cleanup(), abort_recovery(), and class_fail_export()
1438          * all end up in here, and if any of them race we shouldn't
1439          * call extra class_export_puts(). */
1440         if (already_disconnected)
1441                 GOTO(no_disconn, already_disconnected);
1442
1443         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1444                export->exp_handle.h_cookie);
1445
1446         class_export_recovery_cleanup(export);
1447         class_unlink_export(export);
1448 no_disconn:
1449         class_export_put(export);
1450         RETURN(0);
1451 }
1452 EXPORT_SYMBOL(class_disconnect);
1453
1454 /* Return non-zero for a fully connected export */
1455 int class_connected_export(struct obd_export *exp)
1456 {
1457         int connected = 0;
1458
1459         if (exp) {
1460                 spin_lock(&exp->exp_lock);
1461                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1462                 spin_unlock(&exp->exp_lock);
1463         }
1464         return connected;
1465 }
1466 EXPORT_SYMBOL(class_connected_export);
1467
1468 static void class_disconnect_export_list(struct list_head *list,
1469                                          enum obd_option flags)
1470 {
1471         int rc;
1472         struct obd_export *exp;
1473         ENTRY;
1474
1475         /* It's possible that an export may disconnect itself, but
1476          * nothing else will be added to this list.
1477          */
1478         while ((exp = list_first_entry_or_null(list, struct obd_export,
1479                                                exp_obd_chain)) != NULL) {
1480                 /* need for safe call CDEBUG after obd_disconnect */
1481                 class_export_get(exp);
1482
1483                 spin_lock(&exp->exp_lock);
1484                 exp->exp_flags = flags;
1485                 spin_unlock(&exp->exp_lock);
1486
1487                 if (obd_uuid_equals(&exp->exp_client_uuid,
1488                                     &exp->exp_obd->obd_uuid)) {
1489                         CDEBUG(D_HA,
1490                                "exp %p export uuid == obd uuid, don't discon\n",
1491                                exp);
1492                         /* Need to delete this now so we don't end up pointing
1493                          * to work_list later when this export is cleaned up. */
1494                         list_del_init(&exp->exp_obd_chain);
1495                         class_export_put(exp);
1496                         continue;
1497                 }
1498
1499                 class_export_get(exp);
1500                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1501                        "last request at %lld\n",
1502                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1503                        exp, exp->exp_last_request_time);
1504                 /* release one export reference anyway */
1505                 rc = obd_disconnect(exp);
1506
1507                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1508                        obd_export_nid2str(exp), exp, rc);
1509                 class_export_put(exp);
1510         }
1511         EXIT;
1512 }
1513
1514 void class_disconnect_exports(struct obd_device *obd)
1515 {
1516         LIST_HEAD(work_list);
1517         ENTRY;
1518
1519         /* Move all of the exports from obd_exports to a work list, en masse. */
1520         spin_lock(&obd->obd_dev_lock);
1521         list_splice_init(&obd->obd_exports, &work_list);
1522         list_splice_init(&obd->obd_delayed_exports, &work_list);
1523         spin_unlock(&obd->obd_dev_lock);
1524
1525         if (!list_empty(&work_list)) {
1526                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1527                        "disconnecting them\n", obd->obd_minor, obd);
1528                 class_disconnect_export_list(&work_list,
1529                                              exp_flags_from_obd(obd));
1530         } else
1531                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1532                        obd->obd_minor, obd);
1533         EXIT;
1534 }
1535 EXPORT_SYMBOL(class_disconnect_exports);
1536
1537 /* Remove exports that have not completed recovery.
1538  */
1539 void class_disconnect_stale_exports(struct obd_device *obd,
1540                                     int (*test_export)(struct obd_export *))
1541 {
1542         LIST_HEAD(work_list);
1543         struct obd_export *exp, *n;
1544         int evicted = 0;
1545         ENTRY;
1546
1547         spin_lock(&obd->obd_dev_lock);
1548         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1549                                  exp_obd_chain) {
1550                 /* don't count self-export as client */
1551                 if (obd_uuid_equals(&exp->exp_client_uuid,
1552                                     &exp->exp_obd->obd_uuid))
1553                         continue;
1554
1555                 /* don't evict clients which have no slot in last_rcvd
1556                  * (e.g. lightweight connection) */
1557                 if (exp->exp_target_data.ted_lr_idx == -1)
1558                         continue;
1559
1560                 spin_lock(&exp->exp_lock);
1561                 if (exp->exp_failed || test_export(exp)) {
1562                         spin_unlock(&exp->exp_lock);
1563                         continue;
1564                 }
1565                 exp->exp_failed = 1;
1566                 spin_unlock(&exp->exp_lock);
1567
1568                 list_move(&exp->exp_obd_chain, &work_list);
1569                 evicted++;
1570                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1571                        obd->obd_name, exp->exp_client_uuid.uuid,
1572                        obd_export_nid2str(exp));
1573                 print_export_data(exp, "EVICTING", 0, D_HA);
1574         }
1575         spin_unlock(&obd->obd_dev_lock);
1576
1577         if (evicted)
1578                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1579                               obd->obd_name, evicted);
1580
1581         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1582                                                  OBD_OPT_ABORT_RECOV);
1583         EXIT;
1584 }
1585 EXPORT_SYMBOL(class_disconnect_stale_exports);
1586
1587 void class_fail_export(struct obd_export *exp)
1588 {
1589         int rc, already_failed;
1590
1591         spin_lock(&exp->exp_lock);
1592         already_failed = exp->exp_failed;
1593         exp->exp_failed = 1;
1594         spin_unlock(&exp->exp_lock);
1595
1596         if (already_failed) {
1597                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1598                        exp, exp->exp_client_uuid.uuid);
1599                 return;
1600         }
1601
1602         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1603                exp, exp->exp_client_uuid.uuid);
1604
1605         if (obd_dump_on_timeout)
1606                 libcfs_debug_dumplog();
1607
1608         /* need for safe call CDEBUG after obd_disconnect */
1609         class_export_get(exp);
1610
1611         /* Most callers into obd_disconnect are removing their own reference
1612          * (request, for example) in addition to the one from the hash table.
1613          * We don't have such a reference here, so make one. */
1614         class_export_get(exp);
1615         rc = obd_disconnect(exp);
1616         if (rc)
1617                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1618         else
1619                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1620                        exp, exp->exp_client_uuid.uuid);
1621         class_export_put(exp);
1622 }
1623 EXPORT_SYMBOL(class_fail_export);
1624
1625 #ifdef HAVE_SERVER_SUPPORT
1626
1627 static int take_first(struct obd_export *exp, void *data)
1628 {
1629         struct obd_export **expp = data;
1630
1631         if (*expp)
1632                 /* already have one */
1633                 return 0;
1634         if (exp->exp_failed)
1635                 /* Don't want this one */
1636                 return 0;
1637         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1638                 /* Cannot get a ref on this one */
1639                 return 0;
1640         *expp = exp;
1641         return 1;
1642 }
1643
1644 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1645 {
1646         struct lnet_nid nid_key;
1647         struct obd_export *doomed_exp;
1648         int exports_evicted = 0;
1649
1650         libcfs_strnid(&nid_key, nid);
1651
1652         spin_lock(&obd->obd_dev_lock);
1653         /* umount has run already, so evict thread should leave
1654          * its task to umount thread now */
1655         if (obd->obd_stopping) {
1656                 spin_unlock(&obd->obd_dev_lock);
1657                 return exports_evicted;
1658         }
1659         spin_unlock(&obd->obd_dev_lock);
1660
1661         doomed_exp = NULL;
1662         while (obd_nid_export_for_each(obd, &nid_key,
1663                                        take_first, &doomed_exp) > 0) {
1664
1665                 LASSERTF(doomed_exp != obd->obd_self_export,
1666                          "self-export is hashed by NID?\n");
1667
1668                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1669                               obd->obd_name,
1670                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1671                               obd_export_nid2str(doomed_exp));
1672
1673                 class_fail_export(doomed_exp);
1674                 class_export_put(doomed_exp);
1675                 exports_evicted++;
1676                 doomed_exp = NULL;
1677         }
1678
1679         if (!exports_evicted)
1680                 CDEBUG(D_HA,
1681                        "%s: can't disconnect NID '%s': no exports found\n",
1682                        obd->obd_name, nid);
1683         return exports_evicted;
1684 }
1685 EXPORT_SYMBOL(obd_export_evict_by_nid);
1686
1687 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1688 {
1689         struct obd_export *doomed_exp = NULL;
1690         struct obd_uuid doomed_uuid;
1691         int exports_evicted = 0;
1692
1693         spin_lock(&obd->obd_dev_lock);
1694         if (obd->obd_stopping) {
1695                 spin_unlock(&obd->obd_dev_lock);
1696                 return exports_evicted;
1697         }
1698         spin_unlock(&obd->obd_dev_lock);
1699
1700         obd_str2uuid(&doomed_uuid, uuid);
1701         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1702                 CERROR("%s: can't evict myself\n", obd->obd_name);
1703                 return exports_evicted;
1704         }
1705
1706         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1707         if (doomed_exp == NULL) {
1708                 CERROR("%s: can't disconnect %s: no exports found\n",
1709                        obd->obd_name, uuid);
1710         } else {
1711                 CWARN("%s: evicting %s at adminstrative request\n",
1712                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1713                 class_fail_export(doomed_exp);
1714                 class_export_put(doomed_exp);
1715                 obd_uuid_del(obd, doomed_exp);
1716                 exports_evicted++;
1717         }
1718
1719         return exports_evicted;
1720 }
1721 #endif /* HAVE_SERVER_SUPPORT */
1722
1723 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1724 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1725 EXPORT_SYMBOL(class_export_dump_hook);
1726 #endif
1727
1728 static void print_export_data(struct obd_export *exp, const char *status,
1729                               int locks, int debug_level)
1730 {
1731         struct ptlrpc_reply_state *rs;
1732         struct ptlrpc_reply_state *first_reply = NULL;
1733         int nreplies = 0;
1734
1735         spin_lock(&exp->exp_lock);
1736         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1737                             rs_exp_list) {
1738                 if (nreplies == 0)
1739                         first_reply = rs;
1740                 nreplies++;
1741         }
1742         spin_unlock(&exp->exp_lock);
1743
1744         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1745                "%p %s %llu stale:%d\n",
1746                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1747                obd_export_nid2str(exp),
1748                refcount_read(&exp->exp_handle.h_ref),
1749                atomic_read(&exp->exp_rpc_count),
1750                atomic_read(&exp->exp_cb_count),
1751                atomic_read(&exp->exp_locks_count),
1752                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1753                nreplies, first_reply, nreplies > 3 ? "..." : "",
1754                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1755 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1756         if (locks && class_export_dump_hook != NULL)
1757                 class_export_dump_hook(exp);
1758 #endif
1759 }
1760
1761 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1762 {
1763         struct obd_export *exp;
1764
1765         spin_lock(&obd->obd_dev_lock);
1766         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1767                 print_export_data(exp, "ACTIVE", locks, debug_level);
1768         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1769                 print_export_data(exp, "UNLINKED", locks, debug_level);
1770         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1771                 print_export_data(exp, "DELAYED", locks, debug_level);
1772         spin_unlock(&obd->obd_dev_lock);
1773 }
1774
1775 void obd_exports_barrier(struct obd_device *obd)
1776 {
1777         int waited = 2;
1778         LASSERT(list_empty(&obd->obd_exports));
1779         spin_lock(&obd->obd_dev_lock);
1780         while (!list_empty(&obd->obd_unlinked_exports)) {
1781                 spin_unlock(&obd->obd_dev_lock);
1782                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1783                 if (waited > 5 && is_power_of_2(waited)) {
1784                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1785                                       "more than %d seconds. "
1786                                       "The obd refcount = %d. Is it stuck?\n",
1787                                       obd->obd_name, waited,
1788                                       atomic_read(&obd->obd_refcount));
1789                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1790                 }
1791                 waited *= 2;
1792                 spin_lock(&obd->obd_dev_lock);
1793         }
1794         spin_unlock(&obd->obd_dev_lock);
1795 }
1796 EXPORT_SYMBOL(obd_exports_barrier);
1797
1798 /**
1799  * Add export to the obd_zombe thread and notify it.
1800  */
1801 static void obd_zombie_export_add(struct obd_export *exp) {
1802         atomic_dec(&obd_stale_export_num);
1803         spin_lock(&exp->exp_obd->obd_dev_lock);
1804         LASSERT(!list_empty(&exp->exp_obd_chain));
1805         list_del_init(&exp->exp_obd_chain);
1806         spin_unlock(&exp->exp_obd->obd_dev_lock);
1807
1808         queue_work(zombie_wq, &exp->exp_zombie_work);
1809 }
1810
1811 /**
1812  * Add import to the obd_zombe thread and notify it.
1813  */
1814 static void obd_zombie_import_add(struct obd_import *imp) {
1815         LASSERT(imp->imp_sec == NULL);
1816
1817         queue_work(zombie_wq, &imp->imp_zombie_work);
1818 }
1819
1820 /**
1821  * wait when obd_zombie import/export queues become empty
1822  */
1823 void obd_zombie_barrier(void)
1824 {
1825         flush_workqueue(zombie_wq);
1826 }
1827 EXPORT_SYMBOL(obd_zombie_barrier);
1828
1829
1830 struct obd_export *obd_stale_export_get(void)
1831 {
1832         struct obd_export *exp = NULL;
1833         ENTRY;
1834
1835         spin_lock(&obd_stale_export_lock);
1836         if (!list_empty(&obd_stale_exports)) {
1837                 exp = list_first_entry(&obd_stale_exports,
1838                                        struct obd_export, exp_stale_list);
1839                 list_del_init(&exp->exp_stale_list);
1840         }
1841         spin_unlock(&obd_stale_export_lock);
1842
1843         if (exp) {
1844                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1845                        atomic_read(&obd_stale_export_num));
1846         }
1847         RETURN(exp);
1848 }
1849 EXPORT_SYMBOL(obd_stale_export_get);
1850
1851 void obd_stale_export_put(struct obd_export *exp)
1852 {
1853         ENTRY;
1854
1855         LASSERT(list_empty(&exp->exp_stale_list));
1856         if (exp->exp_lock_hash &&
1857             atomic_read(&exp->exp_lock_hash->hs_count)) {
1858                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1859                        atomic_read(&obd_stale_export_num));
1860
1861                 spin_lock_bh(&exp->exp_bl_list_lock);
1862                 spin_lock(&obd_stale_export_lock);
1863                 /* Add to the tail if there is no blocked locks,
1864                  * to the head otherwise. */
1865                 if (list_empty(&exp->exp_bl_list))
1866                         list_add_tail(&exp->exp_stale_list,
1867                                       &obd_stale_exports);
1868                 else
1869                         list_add(&exp->exp_stale_list,
1870                                  &obd_stale_exports);
1871
1872                 spin_unlock(&obd_stale_export_lock);
1873                 spin_unlock_bh(&exp->exp_bl_list_lock);
1874         } else {
1875                 class_export_put(exp);
1876         }
1877         EXIT;
1878 }
1879 EXPORT_SYMBOL(obd_stale_export_put);
1880
1881 /**
1882  * Adjust the position of the export in the stale list,
1883  * i.e. move to the head of the list if is needed.
1884  **/
1885 void obd_stale_export_adjust(struct obd_export *exp)
1886 {
1887         LASSERT(exp != NULL);
1888         spin_lock_bh(&exp->exp_bl_list_lock);
1889         spin_lock(&obd_stale_export_lock);
1890
1891         if (!list_empty(&exp->exp_stale_list) &&
1892             !list_empty(&exp->exp_bl_list))
1893                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1894
1895         spin_unlock(&obd_stale_export_lock);
1896         spin_unlock_bh(&exp->exp_bl_list_lock);
1897 }
1898 EXPORT_SYMBOL(obd_stale_export_adjust);
1899
1900 /**
1901  * start destroy zombie import/export thread
1902  */
1903 int obd_zombie_impexp_init(void)
1904 {
1905         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1906                                            0, CFS_CPT_ANY,
1907                                            cfs_cpt_number(cfs_cpt_tab));
1908
1909         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1910 }
1911
1912 /**
1913  * stop destroy zombie import/export thread
1914  */
1915 void obd_zombie_impexp_stop(void)
1916 {
1917         destroy_workqueue(zombie_wq);
1918         LASSERT(list_empty(&obd_stale_exports));
1919 }
1920
1921 /***** Kernel-userspace comm helpers *******/
1922
1923 /* Get length of entire message, including header */
1924 int kuc_len(int payload_len)
1925 {
1926         return sizeof(struct kuc_hdr) + payload_len;
1927 }
1928 EXPORT_SYMBOL(kuc_len);
1929
1930 /* Get a pointer to kuc header, given a ptr to the payload
1931  * @param p Pointer to payload area
1932  * @returns Pointer to kuc header
1933  */
1934 struct kuc_hdr * kuc_ptr(void *p)
1935 {
1936         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1937         LASSERT(lh->kuc_magic == KUC_MAGIC);
1938         return lh;
1939 }
1940 EXPORT_SYMBOL(kuc_ptr);
1941
1942 /* Alloc space for a message, and fill in header
1943  * @return Pointer to payload area
1944  */
1945 void *kuc_alloc(int payload_len, int transport, int type)
1946 {
1947         struct kuc_hdr *lh;
1948         int len = kuc_len(payload_len);
1949
1950         OBD_ALLOC(lh, len);
1951         if (lh == NULL)
1952                 return ERR_PTR(-ENOMEM);
1953
1954         lh->kuc_magic = KUC_MAGIC;
1955         lh->kuc_transport = transport;
1956         lh->kuc_msgtype = type;
1957         lh->kuc_msglen = len;
1958
1959         return (void *)(lh + 1);
1960 }
1961 EXPORT_SYMBOL(kuc_alloc);
1962
1963 /* Takes pointer to payload area */
1964 void kuc_free(void *p, int payload_len)
1965 {
1966         struct kuc_hdr *lh = kuc_ptr(p);
1967         OBD_FREE(lh, kuc_len(payload_len));
1968 }
1969 EXPORT_SYMBOL(kuc_free);
1970
1971 struct obd_request_slot_waiter {
1972         struct list_head        orsw_entry;
1973         wait_queue_head_t       orsw_waitq;
1974         bool                    orsw_signaled;
1975 };
1976
1977 static bool obd_request_slot_avail(struct client_obd *cli,
1978                                    struct obd_request_slot_waiter *orsw)
1979 {
1980         bool avail;
1981
1982         spin_lock(&cli->cl_loi_list_lock);
1983         avail = !!list_empty(&orsw->orsw_entry);
1984         spin_unlock(&cli->cl_loi_list_lock);
1985
1986         return avail;
1987 };
1988
1989 /*
1990  * For network flow control, the RPC sponsor needs to acquire a credit
1991  * before sending the RPC. The credits count for a connection is defined
1992  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1993  * the subsequent RPC sponsors need to wait until others released their
1994  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1995  */
1996 int obd_get_request_slot(struct client_obd *cli)
1997 {
1998         struct obd_request_slot_waiter   orsw;
1999         int                              rc;
2000
2001         spin_lock(&cli->cl_loi_list_lock);
2002         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2003                 cli->cl_rpcs_in_flight++;
2004                 spin_unlock(&cli->cl_loi_list_lock);
2005                 return 0;
2006         }
2007
2008         init_waitqueue_head(&orsw.orsw_waitq);
2009         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2010         orsw.orsw_signaled = false;
2011         spin_unlock(&cli->cl_loi_list_lock);
2012
2013         rc = l_wait_event_abortable(orsw.orsw_waitq,
2014                                     obd_request_slot_avail(cli, &orsw) ||
2015                                     orsw.orsw_signaled);
2016
2017         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2018          * freed but other (such as obd_put_request_slot) is using it. */
2019         spin_lock(&cli->cl_loi_list_lock);
2020         if (rc != 0) {
2021                 if (!orsw.orsw_signaled) {
2022                         if (list_empty(&orsw.orsw_entry))
2023                                 cli->cl_rpcs_in_flight--;
2024                         else
2025                                 list_del(&orsw.orsw_entry);
2026                 }
2027                 rc = -EINTR;
2028         }
2029
2030         if (orsw.orsw_signaled) {
2031                 LASSERT(list_empty(&orsw.orsw_entry));
2032
2033                 rc = -EINTR;
2034         }
2035         spin_unlock(&cli->cl_loi_list_lock);
2036
2037         return rc;
2038 }
2039 EXPORT_SYMBOL(obd_get_request_slot);
2040
2041 void obd_put_request_slot(struct client_obd *cli)
2042 {
2043         struct obd_request_slot_waiter *orsw;
2044
2045         spin_lock(&cli->cl_loi_list_lock);
2046         cli->cl_rpcs_in_flight--;
2047
2048         /* If there is free slot, wakeup the first waiter. */
2049         if (!list_empty(&cli->cl_flight_waiters) &&
2050             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2051                 orsw = list_first_entry(&cli->cl_flight_waiters,
2052                                         struct obd_request_slot_waiter,
2053                                         orsw_entry);
2054                 list_del_init(&orsw->orsw_entry);
2055                 cli->cl_rpcs_in_flight++;
2056                 wake_up(&orsw->orsw_waitq);
2057         }
2058         spin_unlock(&cli->cl_loi_list_lock);
2059 }
2060 EXPORT_SYMBOL(obd_put_request_slot);
2061
2062 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2063 {
2064         return cli->cl_max_rpcs_in_flight;
2065 }
2066 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2067
2068 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2069 {
2070         struct obd_request_slot_waiter *orsw;
2071         __u32                           old;
2072         int                             diff;
2073         int                             i;
2074         int                             rc;
2075
2076         if (max > OBD_MAX_RIF_MAX || max < 1)
2077                 return -ERANGE;
2078
2079         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2080                cli->cl_import->imp_obd->obd_name, max,
2081                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2082
2083         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2084                    LUSTRE_MDC_NAME) == 0) {
2085                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2086                  * strictly lower that max_rpcs_in_flight */
2087                 if (max < 2) {
2088                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2089                                cli->cl_import->imp_obd->obd_name);
2090                         return -ERANGE;
2091                 }
2092                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2093                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2094                         if (rc != 0)
2095                                 return rc;
2096                 }
2097         }
2098
2099         spin_lock(&cli->cl_loi_list_lock);
2100         old = cli->cl_max_rpcs_in_flight;
2101         cli->cl_max_rpcs_in_flight = max;
2102         client_adjust_max_dirty(cli);
2103
2104         diff = max - old;
2105
2106         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2107         for (i = 0; i < diff; i++) {
2108                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2109                                                 struct obd_request_slot_waiter,
2110                                                 orsw_entry);
2111                 if (!orsw)
2112                         break;
2113
2114                 list_del_init(&orsw->orsw_entry);
2115                 cli->cl_rpcs_in_flight++;
2116                 wake_up(&orsw->orsw_waitq);
2117         }
2118         spin_unlock(&cli->cl_loi_list_lock);
2119
2120         return 0;
2121 }
2122 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2123
2124 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2125 {
2126         return cli->cl_max_mod_rpcs_in_flight;
2127 }
2128 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2129
2130 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2131 {
2132         struct obd_connect_data *ocd;
2133         __u16 maxmodrpcs;
2134         __u16 prev;
2135
2136         if (max > OBD_MAX_RIF_MAX || max < 1)
2137                 return -ERANGE;
2138
2139         ocd = &cli->cl_import->imp_connect_data;
2140         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2141                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2142                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2143
2144         if (max == OBD_MAX_RIF_MAX)
2145                 max = OBD_MAX_RIF_MAX - 1;
2146
2147         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2148          * increase this value, also bump up max_rpcs_in_flight to match.
2149          */
2150         if (max >= cli->cl_max_rpcs_in_flight) {
2151                 CDEBUG(D_INFO,
2152                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2153                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2154                 obd_set_max_rpcs_in_flight(cli, max + 1);
2155         }
2156
2157         /* cannot exceed max modify RPCs in flight supported by the server,
2158          * but verify ocd_connect_flags is at least initialized first.  If
2159          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2160          */
2161         if (!ocd->ocd_connect_flags) {
2162                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2163         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2164                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2165                 if (maxmodrpcs == 0) { /* connection not finished yet */
2166                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2167                         CDEBUG(D_INFO,
2168                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2169                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2170                 }
2171         } else {
2172                 maxmodrpcs = 1;
2173         }
2174         if (max > maxmodrpcs) {
2175                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2176                        cli->cl_import->imp_obd->obd_name,
2177                        max, maxmodrpcs);
2178                 return -ERANGE;
2179         }
2180
2181         spin_lock(&cli->cl_mod_rpcs_lock);
2182
2183         prev = cli->cl_max_mod_rpcs_in_flight;
2184         cli->cl_max_mod_rpcs_in_flight = max;
2185
2186         /* wakeup waiters if limit has been increased */
2187         if (cli->cl_max_mod_rpcs_in_flight > prev)
2188                 wake_up(&cli->cl_mod_rpcs_waitq);
2189
2190         spin_unlock(&cli->cl_mod_rpcs_lock);
2191
2192         return 0;
2193 }
2194 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2195
2196 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2197                                struct seq_file *seq)
2198 {
2199         unsigned long mod_tot = 0, mod_cum;
2200         int i;
2201
2202         spin_lock(&cli->cl_mod_rpcs_lock);
2203         lprocfs_stats_header(seq, ktime_get(), cli->cl_mod_rpcs_init, 25,
2204                              ":", true);
2205         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2206                    cli->cl_mod_rpcs_in_flight);
2207
2208         seq_printf(seq, "\n\t\t\tmodify\n");
2209         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2210
2211         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2212
2213         mod_cum = 0;
2214         for (i = 0; i < OBD_HIST_MAX; i++) {
2215                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2216
2217                 mod_cum += mod;
2218                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2219                            i, mod, pct(mod, mod_tot),
2220                            pct(mod_cum, mod_tot));
2221                 if (mod_cum == mod_tot)
2222                         break;
2223         }
2224
2225         spin_unlock(&cli->cl_mod_rpcs_lock);
2226
2227         return 0;
2228 }
2229 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2230
2231 /* The number of modify RPCs sent in parallel is limited
2232  * because the server has a finite number of slots per client to
2233  * store request result and ensure reply reconstruction when needed.
2234  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2235  * that takes into account server limit and cl_max_rpcs_in_flight
2236  * value.
2237  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2238  * one close request is allowed above the maximum.
2239  */
2240 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2241                                                  bool close_req)
2242 {
2243         bool avail;
2244
2245         /* A slot is available if
2246          * - number of modify RPCs in flight is less than the max
2247          * - it's a close RPC and no other close request is in flight
2248          */
2249         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2250                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2251
2252         return avail;
2253 }
2254
2255 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2256                                          bool close_req)
2257 {
2258         bool avail;
2259
2260         spin_lock(&cli->cl_mod_rpcs_lock);
2261         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2262         spin_unlock(&cli->cl_mod_rpcs_lock);
2263         return avail;
2264 }
2265
2266
2267 /* Get a modify RPC slot from the obd client @cli according
2268  * to the kind of operation @opc that is going to be sent
2269  * and the intent @it of the operation if it applies.
2270  * If the maximum number of modify RPCs in flight is reached
2271  * the thread is put to sleep.
2272  * Returns the tag to be set in the request message. Tag 0
2273  * is reserved for non-modifying requests.
2274  */
2275 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2276 {
2277         bool                    close_req = false;
2278         __u16                   i, max;
2279
2280         if (opc == MDS_CLOSE)
2281                 close_req = true;
2282
2283         do {
2284                 spin_lock(&cli->cl_mod_rpcs_lock);
2285                 max = cli->cl_max_mod_rpcs_in_flight;
2286                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2287                         /* there is a slot available */
2288                         cli->cl_mod_rpcs_in_flight++;
2289                         if (close_req)
2290                                 cli->cl_close_rpcs_in_flight++;
2291                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2292                                          cli->cl_mod_rpcs_in_flight);
2293                         /* find a free tag */
2294                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2295                                                 max + 1);
2296                         LASSERT(i < OBD_MAX_RIF_MAX);
2297                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2298                         spin_unlock(&cli->cl_mod_rpcs_lock);
2299                         /* tag 0 is reserved for non-modify RPCs */
2300
2301                         CDEBUG(D_RPCTRACE,
2302                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2303                                cli->cl_import->imp_obd->obd_name,
2304                                i + 1, opc, max);
2305
2306                         return i + 1;
2307                 }
2308                 spin_unlock(&cli->cl_mod_rpcs_lock);
2309
2310                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2311                        "opc %u, max %hu\n",
2312                        cli->cl_import->imp_obd->obd_name, opc, max);
2313
2314                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2315                                           obd_mod_rpc_slot_avail(cli,
2316                                                                  close_req));
2317         } while (true);
2318 }
2319 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2320
2321 /* Put a modify RPC slot from the obd client @cli according
2322  * to the kind of operation @opc that has been sent.
2323  */
2324 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2325 {
2326         bool                    close_req = false;
2327
2328         if (tag == 0)
2329                 return;
2330
2331         if (opc == MDS_CLOSE)
2332                 close_req = true;
2333
2334         spin_lock(&cli->cl_mod_rpcs_lock);
2335         cli->cl_mod_rpcs_in_flight--;
2336         if (close_req)
2337                 cli->cl_close_rpcs_in_flight--;
2338         /* release the tag in the bitmap */
2339         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2340         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2341         spin_unlock(&cli->cl_mod_rpcs_lock);
2342         /* LU-14741 - to prevent close RPCs stuck behind normal ones */
2343         if (close_req)
2344                 wake_up_all(&cli->cl_mod_rpcs_waitq);
2345         else
2346                 wake_up(&cli->cl_mod_rpcs_waitq);
2347 }
2348 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2349