Whamcloud - gitweb
LU-6142 lustre: remove remaining users of ldebugfs_register
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         if (IS_ERR_OR_NULL(symlink)) {
208                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209                 kobject_put(&type->typ_kobj);
210                 return ERR_PTR(rc);
211         }
212         type->typ_debugfs_entry = symlink;
213         type->typ_sym_filter = true;
214
215         if (enable_proc) {
216                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
217                                                       NULL, NULL);
218                 if (IS_ERR(type->typ_procroot)) {
219                         CERROR("%s: can't create compat proc entry: %d\n",
220                                name, (int)PTR_ERR(type->typ_procroot));
221                         type->typ_procroot = NULL;
222                 }
223         }
224
225         return type;
226 }
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
229
230 #define CLASS_MAX_NAME 1024
231
232 int class_register_type(const struct obd_ops *dt_ops,
233                         const struct md_ops *md_ops,
234                         bool enable_proc, struct lprocfs_vars *vars,
235                         const char *name, struct lu_device_type *ldt)
236 {
237         struct obd_type *type;
238         int rc;
239
240         ENTRY;
241         /* sanity check */
242         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
243
244         type = class_search_type(name);
245         if (type) {
246 #ifdef HAVE_SERVER_SUPPORT
247                 if (type->typ_sym_filter)
248                         goto dir_exist;
249 #endif /* HAVE_SERVER_SUPPORT */
250                 kobject_put(&type->typ_kobj);
251                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
252                 RETURN(-EEXIST);
253         }
254
255         OBD_ALLOC(type, sizeof(*type));
256         if (type == NULL)
257                 RETURN(-ENOMEM);
258
259         type->typ_kobj.kset = lustre_kset;
260         kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
262 dir_exist:
263 #endif /* HAVE_SERVER_SUPPORT */
264
265         type->typ_dt_ops = dt_ops;
266         type->typ_md_ops = md_ops;
267
268 #ifdef HAVE_SERVER_SUPPORT
269         if (type->typ_sym_filter) {
270                 type->typ_sym_filter = false;
271                 kobject_put(&type->typ_kobj);
272                 goto setup_ldt;
273         }
274 #endif
275 #ifdef CONFIG_PROC_FS
276         if (enable_proc && !type->typ_procroot) {
277                 type->typ_procroot = lprocfs_register(name,
278                                                       proc_lustre_root,
279                                                       NULL, type);
280                 if (IS_ERR(type->typ_procroot)) {
281                         rc = PTR_ERR(type->typ_procroot);
282                         type->typ_procroot = NULL;
283                         GOTO(failed, rc);
284                 }
285         }
286 #endif
287         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
288         ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
289
290         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
291         if (rc)
292                 GOTO(failed, rc);
293 #ifdef HAVE_SERVER_SUPPORT
294 setup_ldt:
295 #endif
296         if (ldt) {
297                 type->typ_lu = ldt;
298                 rc = lu_device_type_init(ldt);
299                 if (rc)
300                         GOTO(failed, rc);
301         }
302
303         RETURN(0);
304
305 failed:
306         kobject_put(&type->typ_kobj);
307
308         RETURN(rc);
309 }
310 EXPORT_SYMBOL(class_register_type);
311
312 int class_unregister_type(const char *name)
313 {
314         struct obd_type *type = class_search_type(name);
315         int rc = 0;
316         ENTRY;
317
318         if (!type) {
319                 CERROR("unknown obd type\n");
320                 RETURN(-EINVAL);
321         }
322
323         if (atomic_read(&type->typ_refcnt)) {
324                 CERROR("type %s has refcount (%d)\n", name,
325                        atomic_read(&type->typ_refcnt));
326                 /* This is a bad situation, let's make the best of it */
327                 /* Remove ops, but leave the name for debugging */
328                 type->typ_dt_ops = NULL;
329                 type->typ_md_ops = NULL;
330                 GOTO(out_put, rc = -EBUSY);
331         }
332
333         /* Put the final ref */
334         kobject_put(&type->typ_kobj);
335 out_put:
336         /* Put the ref returned by class_search_type() */
337         kobject_put(&type->typ_kobj);
338
339         RETURN(rc);
340 } /* class_unregister_type */
341 EXPORT_SYMBOL(class_unregister_type);
342
343 /**
344  * Create a new obd device.
345  *
346  * Allocate the new obd_device and initialize it.
347  *
348  * \param[in] type_name obd device type string.
349  * \param[in] name      obd device name.
350  * \param[in] uuid      obd device UUID
351  *
352  * \retval newdev         pointer to created obd_device
353  * \retval ERR_PTR(errno) on error
354  */
355 struct obd_device *class_newdev(const char *type_name, const char *name,
356                                 const char *uuid)
357 {
358         struct obd_device *newdev;
359         struct obd_type *type = NULL;
360         ENTRY;
361
362         if (strlen(name) >= MAX_OBD_NAME) {
363                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
364                 RETURN(ERR_PTR(-EINVAL));
365         }
366
367         type = class_get_type(type_name);
368         if (type == NULL){
369                 CERROR("OBD: unknown type: %s\n", type_name);
370                 RETURN(ERR_PTR(-ENODEV));
371         }
372
373         newdev = obd_device_alloc();
374         if (newdev == NULL) {
375                 class_put_type(type);
376                 RETURN(ERR_PTR(-ENOMEM));
377         }
378         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
379         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
380         newdev->obd_type = type;
381         newdev->obd_minor = -1;
382
383         rwlock_init(&newdev->obd_pool_lock);
384         newdev->obd_pool_limit = 0;
385         newdev->obd_pool_slv = 0;
386
387         INIT_LIST_HEAD(&newdev->obd_exports);
388         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
389         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
390         INIT_LIST_HEAD(&newdev->obd_exports_timed);
391         INIT_LIST_HEAD(&newdev->obd_nid_stats);
392         spin_lock_init(&newdev->obd_nid_lock);
393         spin_lock_init(&newdev->obd_dev_lock);
394         mutex_init(&newdev->obd_dev_mutex);
395         spin_lock_init(&newdev->obd_osfs_lock);
396         /* newdev->obd_osfs_age must be set to a value in the distant
397          * past to guarantee a fresh statfs is fetched on mount. */
398         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
399
400         /* XXX belongs in setup not attach  */
401         init_rwsem(&newdev->obd_observer_link_sem);
402         /* recovery data */
403         spin_lock_init(&newdev->obd_recovery_task_lock);
404         init_waitqueue_head(&newdev->obd_next_transno_waitq);
405         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
406         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
407         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
408         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
409         INIT_LIST_HEAD(&newdev->obd_evict_list);
410         INIT_LIST_HEAD(&newdev->obd_lwp_list);
411
412         llog_group_init(&newdev->obd_olg);
413         /* Detach drops this */
414         atomic_set(&newdev->obd_refcount, 1);
415         lu_ref_init(&newdev->obd_reference);
416         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
417
418         newdev->obd_conn_inprogress = 0;
419
420         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
421
422         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
423                newdev->obd_name, newdev);
424
425         return newdev;
426 }
427
428 /**
429  * Free obd device.
430  *
431  * \param[in] obd obd_device to be freed
432  *
433  * \retval none
434  */
435 void class_free_dev(struct obd_device *obd)
436 {
437         struct obd_type *obd_type = obd->obd_type;
438
439         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
440                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
441         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
442                  "obd %p != obd_devs[%d] %p\n",
443                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
444         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
445                  "obd_refcount should be 0, not %d\n",
446                  atomic_read(&obd->obd_refcount));
447         LASSERT(obd_type != NULL);
448
449         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
450                obd->obd_name, obd->obd_type->typ_name);
451
452         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
453                          obd->obd_name, obd->obd_uuid.uuid);
454         if (obd->obd_stopping) {
455                 int err;
456
457                 /* If we're not stopping, we were never set up */
458                 err = obd_cleanup(obd);
459                 if (err)
460                         CERROR("Cleanup %s returned %d\n",
461                                 obd->obd_name, err);
462         }
463
464         obd_device_free(obd);
465
466         class_put_type(obd_type);
467 }
468
469 /**
470  * Unregister obd device.
471  *
472  * Free slot in obd_dev[] used by \a obd.
473  *
474  * \param[in] new_obd obd_device to be unregistered
475  *
476  * \retval none
477  */
478 void class_unregister_device(struct obd_device *obd)
479 {
480         write_lock(&obd_dev_lock);
481         if (obd->obd_minor >= 0) {
482                 LASSERT(obd_devs[obd->obd_minor] == obd);
483                 obd_devs[obd->obd_minor] = NULL;
484                 obd->obd_minor = -1;
485         }
486         write_unlock(&obd_dev_lock);
487 }
488
489 /**
490  * Register obd device.
491  *
492  * Find free slot in obd_devs[], fills it with \a new_obd.
493  *
494  * \param[in] new_obd obd_device to be registered
495  *
496  * \retval 0          success
497  * \retval -EEXIST    device with this name is registered
498  * \retval -EOVERFLOW obd_devs[] is full
499  */
500 int class_register_device(struct obd_device *new_obd)
501 {
502         int ret = 0;
503         int i;
504         int new_obd_minor = 0;
505         bool minor_assign = false;
506         bool retried = false;
507
508 again:
509         write_lock(&obd_dev_lock);
510         for (i = 0; i < class_devno_max(); i++) {
511                 struct obd_device *obd = class_num2obd(i);
512
513                 if (obd != NULL &&
514                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
515
516                         if (!retried) {
517                                 write_unlock(&obd_dev_lock);
518
519                                 /* the obd_device could be waited to be
520                                  * destroyed by the "obd_zombie_impexp_thread".
521                                  */
522                                 obd_zombie_barrier();
523                                 retried = true;
524                                 goto again;
525                         }
526
527                         CERROR("%s: already exists, won't add\n",
528                                obd->obd_name);
529                         /* in case we found a free slot before duplicate */
530                         minor_assign = false;
531                         ret = -EEXIST;
532                         break;
533                 }
534                 if (!minor_assign && obd == NULL) {
535                         new_obd_minor = i;
536                         minor_assign = true;
537                 }
538         }
539
540         if (minor_assign) {
541                 new_obd->obd_minor = new_obd_minor;
542                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
543                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
544                 obd_devs[new_obd_minor] = new_obd;
545         } else {
546                 if (ret == 0) {
547                         ret = -EOVERFLOW;
548                         CERROR("%s: all %u/%u devices used, increase "
549                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
550                                i, class_devno_max(), ret);
551                 }
552         }
553         write_unlock(&obd_dev_lock);
554
555         RETURN(ret);
556 }
557
558 static int class_name2dev_nolock(const char *name)
559 {
560         int i;
561
562         if (!name)
563                 return -1;
564
565         for (i = 0; i < class_devno_max(); i++) {
566                 struct obd_device *obd = class_num2obd(i);
567
568                 if (obd && strcmp(name, obd->obd_name) == 0) {
569                         /* Make sure we finished attaching before we give
570                            out any references */
571                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
572                         if (obd->obd_attached) {
573                                 return i;
574                         }
575                         break;
576                 }
577         }
578
579         return -1;
580 }
581
582 int class_name2dev(const char *name)
583 {
584         int i;
585
586         if (!name)
587                 return -1;
588
589         read_lock(&obd_dev_lock);
590         i = class_name2dev_nolock(name);
591         read_unlock(&obd_dev_lock);
592
593         return i;
594 }
595 EXPORT_SYMBOL(class_name2dev);
596
597 struct obd_device *class_name2obd(const char *name)
598 {
599         int dev = class_name2dev(name);
600
601         if (dev < 0 || dev > class_devno_max())
602                 return NULL;
603         return class_num2obd(dev);
604 }
605 EXPORT_SYMBOL(class_name2obd);
606
607 int class_uuid2dev_nolock(struct obd_uuid *uuid)
608 {
609         int i;
610
611         for (i = 0; i < class_devno_max(); i++) {
612                 struct obd_device *obd = class_num2obd(i);
613
614                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
615                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
616                         return i;
617                 }
618         }
619
620         return -1;
621 }
622
623 int class_uuid2dev(struct obd_uuid *uuid)
624 {
625         int i;
626
627         read_lock(&obd_dev_lock);
628         i = class_uuid2dev_nolock(uuid);
629         read_unlock(&obd_dev_lock);
630
631         return i;
632 }
633 EXPORT_SYMBOL(class_uuid2dev);
634
635 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
636 {
637         int dev = class_uuid2dev(uuid);
638         if (dev < 0)
639                 return NULL;
640         return class_num2obd(dev);
641 }
642 EXPORT_SYMBOL(class_uuid2obd);
643
644 /**
645  * Get obd device from ::obd_devs[]
646  *
647  * \param num [in] array index
648  *
649  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
650  *         otherwise return the obd device there.
651  */
652 struct obd_device *class_num2obd(int num)
653 {
654         struct obd_device *obd = NULL;
655
656         if (num < class_devno_max()) {
657                 obd = obd_devs[num];
658                 if (obd == NULL)
659                         return NULL;
660
661                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
662                          "%p obd_magic %08x != %08x\n",
663                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
664                 LASSERTF(obd->obd_minor == num,
665                          "%p obd_minor %0d != %0d\n",
666                          obd, obd->obd_minor, num);
667         }
668
669         return obd;
670 }
671
672 /**
673  * Find obd in obd_dev[] by name or uuid.
674  *
675  * Increment obd's refcount if found.
676  *
677  * \param[in] str obd name or uuid
678  *
679  * \retval NULL    if not found
680  * \retval target  pointer to found obd_device
681  */
682 struct obd_device *class_dev_by_str(const char *str)
683 {
684         struct obd_device *target = NULL;
685         struct obd_uuid tgtuuid;
686         int rc;
687
688         obd_str2uuid(&tgtuuid, str);
689
690         read_lock(&obd_dev_lock);
691         rc = class_uuid2dev_nolock(&tgtuuid);
692         if (rc < 0)
693                 rc = class_name2dev_nolock(str);
694
695         if (rc >= 0)
696                 target = class_num2obd(rc);
697
698         if (target != NULL)
699                 class_incref(target, "find", current);
700         read_unlock(&obd_dev_lock);
701
702         RETURN(target);
703 }
704 EXPORT_SYMBOL(class_dev_by_str);
705
706 /**
707  * Get obd devices count. Device in any
708  *    state are counted
709  * \retval obd device count
710  */
711 int get_devices_count(void)
712 {
713         int index, max_index = class_devno_max(), dev_count = 0;
714
715         read_lock(&obd_dev_lock);
716         for (index = 0; index <= max_index; index++) {
717                 struct obd_device *obd = class_num2obd(index);
718                 if (obd != NULL)
719                         dev_count++;
720         }
721         read_unlock(&obd_dev_lock);
722
723         return dev_count;
724 }
725 EXPORT_SYMBOL(get_devices_count);
726
727 void class_obd_list(void)
728 {
729         char *status;
730         int i;
731
732         read_lock(&obd_dev_lock);
733         for (i = 0; i < class_devno_max(); i++) {
734                 struct obd_device *obd = class_num2obd(i);
735
736                 if (obd == NULL)
737                         continue;
738                 if (obd->obd_stopping)
739                         status = "ST";
740                 else if (obd->obd_set_up)
741                         status = "UP";
742                 else if (obd->obd_attached)
743                         status = "AT";
744                 else
745                         status = "--";
746                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
747                          i, status, obd->obd_type->typ_name,
748                          obd->obd_name, obd->obd_uuid.uuid,
749                          atomic_read(&obd->obd_refcount));
750         }
751         read_unlock(&obd_dev_lock);
752 }
753
754 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
755  * specified, then only the client with that uuid is returned,
756  * otherwise any client connected to the tgt is returned.
757  */
758 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
759                                          const char *type_name,
760                                          struct obd_uuid *grp_uuid)
761 {
762         int i;
763
764         read_lock(&obd_dev_lock);
765         for (i = 0; i < class_devno_max(); i++) {
766                 struct obd_device *obd = class_num2obd(i);
767
768                 if (obd == NULL)
769                         continue;
770                 if ((strncmp(obd->obd_type->typ_name, type_name,
771                              strlen(type_name)) == 0)) {
772                         if (obd_uuid_equals(tgt_uuid,
773                                             &obd->u.cli.cl_target_uuid) &&
774                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
775                                                          &obd->obd_uuid) : 1)) {
776                                 read_unlock(&obd_dev_lock);
777                                 return obd;
778                         }
779                 }
780         }
781         read_unlock(&obd_dev_lock);
782
783         return NULL;
784 }
785 EXPORT_SYMBOL(class_find_client_obd);
786
787 /* Iterate the obd_device list looking devices have grp_uuid. Start
788  * searching at *next, and if a device is found, the next index to look
789  * at is saved in *next. If next is NULL, then the first matching device
790  * will always be returned.
791  */
792 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
793 {
794         int i;
795
796         if (next == NULL)
797                 i = 0;
798         else if (*next >= 0 && *next < class_devno_max())
799                 i = *next;
800         else
801                 return NULL;
802
803         read_lock(&obd_dev_lock);
804         for (; i < class_devno_max(); i++) {
805                 struct obd_device *obd = class_num2obd(i);
806
807                 if (obd == NULL)
808                         continue;
809                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
810                         if (next != NULL)
811                                 *next = i+1;
812                         read_unlock(&obd_dev_lock);
813                         return obd;
814                 }
815         }
816         read_unlock(&obd_dev_lock);
817
818         return NULL;
819 }
820 EXPORT_SYMBOL(class_devices_in_group);
821
822 /**
823  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
824  * adjust sptlrpc settings accordingly.
825  */
826 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
827 {
828         struct obd_device  *obd;
829         const char         *type;
830         int                 i, rc = 0, rc2;
831
832         LASSERT(namelen > 0);
833
834         read_lock(&obd_dev_lock);
835         for (i = 0; i < class_devno_max(); i++) {
836                 obd = class_num2obd(i);
837
838                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
839                         continue;
840
841                 /* only notify mdc, osc, osp, lwp, mdt, ost
842                  * because only these have a -sptlrpc llog */
843                 type = obd->obd_type->typ_name;
844                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
845                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
846                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
847                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
848                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
849                     strcmp(type, LUSTRE_OST_NAME) != 0)
850                         continue;
851
852                 if (strncmp(obd->obd_name, fsname, namelen))
853                         continue;
854
855                 class_incref(obd, __FUNCTION__, obd);
856                 read_unlock(&obd_dev_lock);
857                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
858                                          sizeof(KEY_SPTLRPC_CONF),
859                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
860                 rc = rc ? rc : rc2;
861                 class_decref(obd, __FUNCTION__, obd);
862                 read_lock(&obd_dev_lock);
863         }
864         read_unlock(&obd_dev_lock);
865         return rc;
866 }
867 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
868
869 void obd_cleanup_caches(void)
870 {
871         ENTRY;
872         if (obd_device_cachep) {
873                 kmem_cache_destroy(obd_device_cachep);
874                 obd_device_cachep = NULL;
875         }
876
877         EXIT;
878 }
879
880 int obd_init_caches(void)
881 {
882         int rc;
883         ENTRY;
884
885         LASSERT(obd_device_cachep == NULL);
886         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
887                                 sizeof(struct obd_device),
888                                 0, 0, 0, sizeof(struct obd_device), NULL);
889         if (!obd_device_cachep)
890                 GOTO(out, rc = -ENOMEM);
891
892         RETURN(0);
893 out:
894         obd_cleanup_caches();
895         RETURN(rc);
896 }
897
898 static const char export_handle_owner[] = "export";
899
900 /* map connection to client */
901 struct obd_export *class_conn2export(struct lustre_handle *conn)
902 {
903         struct obd_export *export;
904         ENTRY;
905
906         if (!conn) {
907                 CDEBUG(D_CACHE, "looking for null handle\n");
908                 RETURN(NULL);
909         }
910
911         if (conn->cookie == -1) {  /* this means assign a new connection */
912                 CDEBUG(D_CACHE, "want a new connection\n");
913                 RETURN(NULL);
914         }
915
916         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
917         export = class_handle2object(conn->cookie, export_handle_owner);
918         RETURN(export);
919 }
920 EXPORT_SYMBOL(class_conn2export);
921
922 struct obd_device *class_exp2obd(struct obd_export *exp)
923 {
924         if (exp)
925                 return exp->exp_obd;
926         return NULL;
927 }
928 EXPORT_SYMBOL(class_exp2obd);
929
930 struct obd_import *class_exp2cliimp(struct obd_export *exp)
931 {
932         struct obd_device *obd = exp->exp_obd;
933         if (obd == NULL)
934                 return NULL;
935         return obd->u.cli.cl_import;
936 }
937 EXPORT_SYMBOL(class_exp2cliimp);
938
939 /* Export management functions */
940 static void class_export_destroy(struct obd_export *exp)
941 {
942         struct obd_device *obd = exp->exp_obd;
943         ENTRY;
944
945         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
946         LASSERT(obd != NULL);
947
948         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
949                exp->exp_client_uuid.uuid, obd->obd_name);
950
951         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
952         if (exp->exp_connection)
953                 ptlrpc_put_connection_superhack(exp->exp_connection);
954
955         LASSERT(list_empty(&exp->exp_outstanding_replies));
956         LASSERT(list_empty(&exp->exp_uncommitted_replies));
957         LASSERT(list_empty(&exp->exp_req_replay_queue));
958         LASSERT(list_empty(&exp->exp_hp_rpcs));
959         obd_destroy_export(exp);
960         /* self export doesn't hold a reference to an obd, although it
961          * exists until freeing of the obd */
962         if (exp != obd->obd_self_export)
963                 class_decref(obd, "export", exp);
964
965         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
966         kfree_rcu(exp, exp_handle.h_rcu);
967         EXIT;
968 }
969
970 struct obd_export *class_export_get(struct obd_export *exp)
971 {
972         refcount_inc(&exp->exp_handle.h_ref);
973         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
974                refcount_read(&exp->exp_handle.h_ref));
975         return exp;
976 }
977 EXPORT_SYMBOL(class_export_get);
978
979 void class_export_put(struct obd_export *exp)
980 {
981         LASSERT(exp != NULL);
982         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
983         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
984         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
985                refcount_read(&exp->exp_handle.h_ref) - 1);
986
987         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
988                 struct obd_device *obd = exp->exp_obd;
989
990                 CDEBUG(D_IOCTL, "final put %p/%s\n",
991                        exp, exp->exp_client_uuid.uuid);
992
993                 /* release nid stat refererence */
994                 lprocfs_exp_cleanup(exp);
995
996                 if (exp == obd->obd_self_export) {
997                         /* self export should be destroyed without
998                          * zombie thread as it doesn't hold a
999                          * reference to obd and doesn't hold any
1000                          * resources */
1001                         class_export_destroy(exp);
1002                         /* self export is destroyed, no class
1003                          * references exist and it is safe to free
1004                          * obd */
1005                         class_free_dev(obd);
1006                 } else {
1007                         LASSERT(!list_empty(&exp->exp_obd_chain));
1008                         obd_zombie_export_add(exp);
1009                 }
1010
1011         }
1012 }
1013 EXPORT_SYMBOL(class_export_put);
1014
1015 static void obd_zombie_exp_cull(struct work_struct *ws)
1016 {
1017         struct obd_export *export;
1018
1019         export = container_of(ws, struct obd_export, exp_zombie_work);
1020         class_export_destroy(export);
1021 }
1022
1023 /* Creates a new export, adds it to the hash table, and returns a
1024  * pointer to it. The refcount is 2: one for the hash reference, and
1025  * one for the pointer returned by this function. */
1026 struct obd_export *__class_new_export(struct obd_device *obd,
1027                                       struct obd_uuid *cluuid, bool is_self)
1028 {
1029         struct obd_export *export;
1030         int rc = 0;
1031         ENTRY;
1032
1033         OBD_ALLOC_PTR(export);
1034         if (!export)
1035                 return ERR_PTR(-ENOMEM);
1036
1037         export->exp_conn_cnt = 0;
1038         export->exp_lock_hash = NULL;
1039         export->exp_flock_hash = NULL;
1040         /* 2 = class_handle_hash + last */
1041         refcount_set(&export->exp_handle.h_ref, 2);
1042         atomic_set(&export->exp_rpc_count, 0);
1043         atomic_set(&export->exp_cb_count, 0);
1044         atomic_set(&export->exp_locks_count, 0);
1045 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1046         INIT_LIST_HEAD(&export->exp_locks_list);
1047         spin_lock_init(&export->exp_locks_list_guard);
1048 #endif
1049         atomic_set(&export->exp_replay_count, 0);
1050         export->exp_obd = obd;
1051         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1052         spin_lock_init(&export->exp_uncommitted_replies_lock);
1053         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1054         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1055         INIT_HLIST_NODE(&export->exp_handle.h_link);
1056         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1057         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1058         class_handle_hash(&export->exp_handle, export_handle_owner);
1059         export->exp_last_request_time = ktime_get_real_seconds();
1060         spin_lock_init(&export->exp_lock);
1061         spin_lock_init(&export->exp_rpc_lock);
1062         INIT_HLIST_NODE(&export->exp_nid_hash);
1063         INIT_HLIST_NODE(&export->exp_gen_hash);
1064         spin_lock_init(&export->exp_bl_list_lock);
1065         INIT_LIST_HEAD(&export->exp_bl_list);
1066         INIT_LIST_HEAD(&export->exp_stale_list);
1067         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1068
1069         export->exp_sp_peer = LUSTRE_SP_ANY;
1070         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1071         export->exp_client_uuid = *cluuid;
1072         obd_init_export(export);
1073
1074         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1075
1076         spin_lock(&obd->obd_dev_lock);
1077         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1078                 /* shouldn't happen, but might race */
1079                 if (obd->obd_stopping)
1080                         GOTO(exit_unlock, rc = -ENODEV);
1081
1082                 rc = obd_uuid_add(obd, export);
1083                 if (rc != 0) {
1084                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1085                                       obd->obd_name, cluuid->uuid, rc);
1086                         GOTO(exit_unlock, rc = -EALREADY);
1087                 }
1088         }
1089
1090         if (!is_self) {
1091                 class_incref(obd, "export", export);
1092                 list_add_tail(&export->exp_obd_chain_timed,
1093                               &obd->obd_exports_timed);
1094                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1095                 obd->obd_num_exports++;
1096         } else {
1097                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1098                 INIT_LIST_HEAD(&export->exp_obd_chain);
1099         }
1100         spin_unlock(&obd->obd_dev_lock);
1101         RETURN(export);
1102
1103 exit_unlock:
1104         spin_unlock(&obd->obd_dev_lock);
1105         class_handle_unhash(&export->exp_handle);
1106         obd_destroy_export(export);
1107         OBD_FREE_PTR(export);
1108         return ERR_PTR(rc);
1109 }
1110
1111 struct obd_export *class_new_export(struct obd_device *obd,
1112                                     struct obd_uuid *uuid)
1113 {
1114         return __class_new_export(obd, uuid, false);
1115 }
1116 EXPORT_SYMBOL(class_new_export);
1117
1118 struct obd_export *class_new_export_self(struct obd_device *obd,
1119                                          struct obd_uuid *uuid)
1120 {
1121         return __class_new_export(obd, uuid, true);
1122 }
1123
1124 void class_unlink_export(struct obd_export *exp)
1125 {
1126         class_handle_unhash(&exp->exp_handle);
1127
1128         if (exp->exp_obd->obd_self_export == exp) {
1129                 class_export_put(exp);
1130                 return;
1131         }
1132
1133         spin_lock(&exp->exp_obd->obd_dev_lock);
1134         /* delete an uuid-export hashitem from hashtables */
1135         if (exp != exp->exp_obd->obd_self_export)
1136                 obd_uuid_del(exp->exp_obd, exp);
1137
1138 #ifdef HAVE_SERVER_SUPPORT
1139         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1140                 struct tg_export_data   *ted = &exp->exp_target_data;
1141                 struct cfs_hash         *hash;
1142
1143                 /* Because obd_gen_hash will not be released until
1144                  * class_cleanup(), so hash should never be NULL here */
1145                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1146                 LASSERT(hash != NULL);
1147                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1148                              &exp->exp_gen_hash);
1149                 cfs_hash_putref(hash);
1150         }
1151 #endif /* HAVE_SERVER_SUPPORT */
1152
1153         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1154         list_del_init(&exp->exp_obd_chain_timed);
1155         exp->exp_obd->obd_num_exports--;
1156         spin_unlock(&exp->exp_obd->obd_dev_lock);
1157         atomic_inc(&obd_stale_export_num);
1158
1159         /* A reference is kept by obd_stale_exports list */
1160         obd_stale_export_put(exp);
1161 }
1162 EXPORT_SYMBOL(class_unlink_export);
1163
1164 /* Import management functions */
1165 static void obd_zombie_import_free(struct obd_import *imp)
1166 {
1167         ENTRY;
1168
1169         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1170                 imp->imp_obd->obd_name);
1171
1172         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1173
1174         ptlrpc_put_connection_superhack(imp->imp_connection);
1175
1176         while (!list_empty(&imp->imp_conn_list)) {
1177                 struct obd_import_conn *imp_conn;
1178
1179                 imp_conn = list_first_entry(&imp->imp_conn_list,
1180                                             struct obd_import_conn, oic_item);
1181                 list_del_init(&imp_conn->oic_item);
1182                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1183                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1184         }
1185
1186         LASSERT(imp->imp_sec == NULL);
1187         class_decref(imp->imp_obd, "import", imp);
1188         OBD_FREE_PTR(imp);
1189         EXIT;
1190 }
1191
1192 struct obd_import *class_import_get(struct obd_import *import)
1193 {
1194         refcount_inc(&import->imp_refcount);
1195         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1196                refcount_read(&import->imp_refcount),
1197                import->imp_obd->obd_name);
1198         return import;
1199 }
1200 EXPORT_SYMBOL(class_import_get);
1201
1202 void class_import_put(struct obd_import *imp)
1203 {
1204         ENTRY;
1205
1206         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1207
1208         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1209                refcount_read(&imp->imp_refcount) - 1,
1210                imp->imp_obd->obd_name);
1211
1212         if (refcount_dec_and_test(&imp->imp_refcount)) {
1213                 CDEBUG(D_INFO, "final put import %p\n", imp);
1214                 obd_zombie_import_add(imp);
1215         }
1216
1217         EXIT;
1218 }
1219 EXPORT_SYMBOL(class_import_put);
1220
1221 static void init_imp_at(struct imp_at *at) {
1222         int i;
1223         at_init(&at->iat_net_latency, 0, 0);
1224         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1225                 /* max service estimates are tracked on the server side, so
1226                    don't use the AT history here, just use the last reported
1227                    val. (But keep hist for proc histogram, worst_ever) */
1228                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1229                         AT_FLG_NOHIST);
1230         }
1231 }
1232
1233 static void obd_zombie_imp_cull(struct work_struct *ws)
1234 {
1235         struct obd_import *import;
1236
1237         import = container_of(ws, struct obd_import, imp_zombie_work);
1238         obd_zombie_import_free(import);
1239 }
1240
1241 struct obd_import *class_new_import(struct obd_device *obd)
1242 {
1243         struct obd_import *imp;
1244         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1245
1246         OBD_ALLOC(imp, sizeof(*imp));
1247         if (imp == NULL)
1248                 return NULL;
1249
1250         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1251         INIT_LIST_HEAD(&imp->imp_replay_list);
1252         INIT_LIST_HEAD(&imp->imp_sending_list);
1253         INIT_LIST_HEAD(&imp->imp_delayed_list);
1254         INIT_LIST_HEAD(&imp->imp_committed_list);
1255         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1256         imp->imp_known_replied_xid = 0;
1257         imp->imp_replay_cursor = &imp->imp_committed_list;
1258         spin_lock_init(&imp->imp_lock);
1259         imp->imp_last_success_conn = 0;
1260         imp->imp_state = LUSTRE_IMP_NEW;
1261         imp->imp_obd = class_incref(obd, "import", imp);
1262         rwlock_init(&imp->imp_sec_lock);
1263         init_waitqueue_head(&imp->imp_recovery_waitq);
1264         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1265
1266         if (curr_pid_ns && curr_pid_ns->child_reaper)
1267                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1268         else
1269                 imp->imp_sec_refpid = 1;
1270
1271         refcount_set(&imp->imp_refcount, 2);
1272         atomic_set(&imp->imp_unregistering, 0);
1273         atomic_set(&imp->imp_inflight, 0);
1274         atomic_set(&imp->imp_replay_inflight, 0);
1275         atomic_set(&imp->imp_inval_count, 0);
1276         INIT_LIST_HEAD(&imp->imp_conn_list);
1277         init_imp_at(&imp->imp_at);
1278
1279         /* the default magic is V2, will be used in connect RPC, and
1280          * then adjusted according to the flags in request/reply. */
1281         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1282
1283         return imp;
1284 }
1285 EXPORT_SYMBOL(class_new_import);
1286
1287 void class_destroy_import(struct obd_import *import)
1288 {
1289         LASSERT(import != NULL);
1290         LASSERT(import != LP_POISON);
1291
1292         spin_lock(&import->imp_lock);
1293         import->imp_generation++;
1294         spin_unlock(&import->imp_lock);
1295         class_import_put(import);
1296 }
1297 EXPORT_SYMBOL(class_destroy_import);
1298
1299 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1300
1301 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1302 {
1303         spin_lock(&exp->exp_locks_list_guard);
1304
1305         LASSERT(lock->l_exp_refs_nr >= 0);
1306
1307         if (lock->l_exp_refs_target != NULL &&
1308             lock->l_exp_refs_target != exp) {
1309                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1310                               exp, lock, lock->l_exp_refs_target);
1311         }
1312         if ((lock->l_exp_refs_nr ++) == 0) {
1313                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1314                 lock->l_exp_refs_target = exp;
1315         }
1316         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1317                lock, exp, lock->l_exp_refs_nr);
1318         spin_unlock(&exp->exp_locks_list_guard);
1319 }
1320 EXPORT_SYMBOL(__class_export_add_lock_ref);
1321
1322 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1323 {
1324         spin_lock(&exp->exp_locks_list_guard);
1325         LASSERT(lock->l_exp_refs_nr > 0);
1326         if (lock->l_exp_refs_target != exp) {
1327                 LCONSOLE_WARN("lock %p, "
1328                               "mismatching export pointers: %p, %p\n",
1329                               lock, lock->l_exp_refs_target, exp);
1330         }
1331         if (-- lock->l_exp_refs_nr == 0) {
1332                 list_del_init(&lock->l_exp_refs_link);
1333                 lock->l_exp_refs_target = NULL;
1334         }
1335         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1336                lock, exp, lock->l_exp_refs_nr);
1337         spin_unlock(&exp->exp_locks_list_guard);
1338 }
1339 EXPORT_SYMBOL(__class_export_del_lock_ref);
1340 #endif
1341
1342 /* A connection defines an export context in which preallocation can
1343    be managed. This releases the export pointer reference, and returns
1344    the export handle, so the export refcount is 1 when this function
1345    returns. */
1346 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1347                   struct obd_uuid *cluuid)
1348 {
1349         struct obd_export *export;
1350         LASSERT(conn != NULL);
1351         LASSERT(obd != NULL);
1352         LASSERT(cluuid != NULL);
1353         ENTRY;
1354
1355         export = class_new_export(obd, cluuid);
1356         if (IS_ERR(export))
1357                 RETURN(PTR_ERR(export));
1358
1359         conn->cookie = export->exp_handle.h_cookie;
1360         class_export_put(export);
1361
1362         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1363                cluuid->uuid, conn->cookie);
1364         RETURN(0);
1365 }
1366 EXPORT_SYMBOL(class_connect);
1367
1368 /* if export is involved in recovery then clean up related things */
1369 static void class_export_recovery_cleanup(struct obd_export *exp)
1370 {
1371         struct obd_device *obd = exp->exp_obd;
1372
1373         spin_lock(&obd->obd_recovery_task_lock);
1374         if (obd->obd_recovering) {
1375                 if (exp->exp_in_recovery) {
1376                         spin_lock(&exp->exp_lock);
1377                         exp->exp_in_recovery = 0;
1378                         spin_unlock(&exp->exp_lock);
1379                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1380                         atomic_dec(&obd->obd_connected_clients);
1381                 }
1382
1383                 /* if called during recovery then should update
1384                  * obd_stale_clients counter,
1385                  * lightweight exports are not counted */
1386                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1387                         exp->exp_obd->obd_stale_clients++;
1388         }
1389         spin_unlock(&obd->obd_recovery_task_lock);
1390
1391         spin_lock(&exp->exp_lock);
1392         /** Cleanup req replay fields */
1393         if (exp->exp_req_replay_needed) {
1394                 exp->exp_req_replay_needed = 0;
1395
1396                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1397                 atomic_dec(&obd->obd_req_replay_clients);
1398         }
1399
1400         /** Cleanup lock replay data */
1401         if (exp->exp_lock_replay_needed) {
1402                 exp->exp_lock_replay_needed = 0;
1403
1404                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1405                 atomic_dec(&obd->obd_lock_replay_clients);
1406         }
1407         spin_unlock(&exp->exp_lock);
1408 }
1409
1410 /* This function removes 1-3 references from the export:
1411  * 1 - for export pointer passed
1412  * and if disconnect really need
1413  * 2 - removing from hash
1414  * 3 - in client_unlink_export
1415  * The export pointer passed to this function can destroyed */
1416 int class_disconnect(struct obd_export *export)
1417 {
1418         int already_disconnected;
1419         ENTRY;
1420
1421         if (export == NULL) {
1422                 CWARN("attempting to free NULL export %p\n", export);
1423                 RETURN(-EINVAL);
1424         }
1425
1426         spin_lock(&export->exp_lock);
1427         already_disconnected = export->exp_disconnected;
1428         export->exp_disconnected = 1;
1429         /*  We hold references of export for uuid hash
1430          *  and nid_hash and export link at least. So
1431          *  it is safe to call cfs_hash_del in there.  */
1432         if (!hlist_unhashed(&export->exp_nid_hash))
1433                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1434                              &export->exp_connection->c_peer.nid,
1435                              &export->exp_nid_hash);
1436         spin_unlock(&export->exp_lock);
1437
1438         /* class_cleanup(), abort_recovery(), and class_fail_export()
1439          * all end up in here, and if any of them race we shouldn't
1440          * call extra class_export_puts(). */
1441         if (already_disconnected) {
1442                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1443                 GOTO(no_disconn, already_disconnected);
1444         }
1445
1446         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1447                export->exp_handle.h_cookie);
1448
1449         class_export_recovery_cleanup(export);
1450         class_unlink_export(export);
1451 no_disconn:
1452         class_export_put(export);
1453         RETURN(0);
1454 }
1455 EXPORT_SYMBOL(class_disconnect);
1456
1457 /* Return non-zero for a fully connected export */
1458 int class_connected_export(struct obd_export *exp)
1459 {
1460         int connected = 0;
1461
1462         if (exp) {
1463                 spin_lock(&exp->exp_lock);
1464                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1465                 spin_unlock(&exp->exp_lock);
1466         }
1467         return connected;
1468 }
1469 EXPORT_SYMBOL(class_connected_export);
1470
1471 static void class_disconnect_export_list(struct list_head *list,
1472                                          enum obd_option flags)
1473 {
1474         int rc;
1475         struct obd_export *exp;
1476         ENTRY;
1477
1478         /* It's possible that an export may disconnect itself, but
1479          * nothing else will be added to this list. */
1480         while (!list_empty(list)) {
1481                 exp = list_first_entry(list, struct obd_export,
1482                                        exp_obd_chain);
1483                 /* need for safe call CDEBUG after obd_disconnect */
1484                 class_export_get(exp);
1485
1486                 spin_lock(&exp->exp_lock);
1487                 exp->exp_flags = flags;
1488                 spin_unlock(&exp->exp_lock);
1489
1490                 if (obd_uuid_equals(&exp->exp_client_uuid,
1491                                     &exp->exp_obd->obd_uuid)) {
1492                         CDEBUG(D_HA,
1493                                "exp %p export uuid == obd uuid, don't discon\n",
1494                                exp);
1495                         /* Need to delete this now so we don't end up pointing
1496                          * to work_list later when this export is cleaned up. */
1497                         list_del_init(&exp->exp_obd_chain);
1498                         class_export_put(exp);
1499                         continue;
1500                 }
1501
1502                 class_export_get(exp);
1503                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1504                        "last request at %lld\n",
1505                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1506                        exp, exp->exp_last_request_time);
1507                 /* release one export reference anyway */
1508                 rc = obd_disconnect(exp);
1509
1510                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1511                        obd_export_nid2str(exp), exp, rc);
1512                 class_export_put(exp);
1513         }
1514         EXIT;
1515 }
1516
1517 void class_disconnect_exports(struct obd_device *obd)
1518 {
1519         LIST_HEAD(work_list);
1520         ENTRY;
1521
1522         /* Move all of the exports from obd_exports to a work list, en masse. */
1523         spin_lock(&obd->obd_dev_lock);
1524         list_splice_init(&obd->obd_exports, &work_list);
1525         list_splice_init(&obd->obd_delayed_exports, &work_list);
1526         spin_unlock(&obd->obd_dev_lock);
1527
1528         if (!list_empty(&work_list)) {
1529                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1530                        "disconnecting them\n", obd->obd_minor, obd);
1531                 class_disconnect_export_list(&work_list,
1532                                              exp_flags_from_obd(obd));
1533         } else
1534                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1535                        obd->obd_minor, obd);
1536         EXIT;
1537 }
1538 EXPORT_SYMBOL(class_disconnect_exports);
1539
1540 /* Remove exports that have not completed recovery.
1541  */
1542 void class_disconnect_stale_exports(struct obd_device *obd,
1543                                     int (*test_export)(struct obd_export *))
1544 {
1545         LIST_HEAD(work_list);
1546         struct obd_export *exp, *n;
1547         int evicted = 0;
1548         ENTRY;
1549
1550         spin_lock(&obd->obd_dev_lock);
1551         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1552                                  exp_obd_chain) {
1553                 /* don't count self-export as client */
1554                 if (obd_uuid_equals(&exp->exp_client_uuid,
1555                                     &exp->exp_obd->obd_uuid))
1556                         continue;
1557
1558                 /* don't evict clients which have no slot in last_rcvd
1559                  * (e.g. lightweight connection) */
1560                 if (exp->exp_target_data.ted_lr_idx == -1)
1561                         continue;
1562
1563                 spin_lock(&exp->exp_lock);
1564                 if (exp->exp_failed || test_export(exp)) {
1565                         spin_unlock(&exp->exp_lock);
1566                         continue;
1567                 }
1568                 exp->exp_failed = 1;
1569                 spin_unlock(&exp->exp_lock);
1570
1571                 list_move(&exp->exp_obd_chain, &work_list);
1572                 evicted++;
1573                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1574                        obd->obd_name, exp->exp_client_uuid.uuid,
1575                        obd_export_nid2str(exp));
1576                 print_export_data(exp, "EVICTING", 0, D_HA);
1577         }
1578         spin_unlock(&obd->obd_dev_lock);
1579
1580         if (evicted)
1581                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1582                               obd->obd_name, evicted);
1583
1584         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1585                                                  OBD_OPT_ABORT_RECOV);
1586         EXIT;
1587 }
1588 EXPORT_SYMBOL(class_disconnect_stale_exports);
1589
1590 void class_fail_export(struct obd_export *exp)
1591 {
1592         int rc, already_failed;
1593
1594         spin_lock(&exp->exp_lock);
1595         already_failed = exp->exp_failed;
1596         exp->exp_failed = 1;
1597         spin_unlock(&exp->exp_lock);
1598
1599         if (already_failed) {
1600                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1601                        exp, exp->exp_client_uuid.uuid);
1602                 return;
1603         }
1604
1605         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1606                exp, exp->exp_client_uuid.uuid);
1607
1608         if (obd_dump_on_timeout)
1609                 libcfs_debug_dumplog();
1610
1611         /* need for safe call CDEBUG after obd_disconnect */
1612         class_export_get(exp);
1613
1614         /* Most callers into obd_disconnect are removing their own reference
1615          * (request, for example) in addition to the one from the hash table.
1616          * We don't have such a reference here, so make one. */
1617         class_export_get(exp);
1618         rc = obd_disconnect(exp);
1619         if (rc)
1620                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1621         else
1622                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1623                        exp, exp->exp_client_uuid.uuid);
1624         class_export_put(exp);
1625 }
1626 EXPORT_SYMBOL(class_fail_export);
1627
1628 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1629 {
1630         struct cfs_hash *nid_hash;
1631         struct obd_export *doomed_exp = NULL;
1632         int exports_evicted = 0;
1633
1634         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1635
1636         spin_lock(&obd->obd_dev_lock);
1637         /* umount has run already, so evict thread should leave
1638          * its task to umount thread now */
1639         if (obd->obd_stopping) {
1640                 spin_unlock(&obd->obd_dev_lock);
1641                 return exports_evicted;
1642         }
1643         nid_hash = obd->obd_nid_hash;
1644         cfs_hash_getref(nid_hash);
1645         spin_unlock(&obd->obd_dev_lock);
1646
1647         do {
1648                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1649                 if (doomed_exp == NULL)
1650                         break;
1651
1652                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1653                          "nid %s found, wanted nid %s, requested nid %s\n",
1654                          obd_export_nid2str(doomed_exp),
1655                          libcfs_nid2str(nid_key), nid);
1656                 LASSERTF(doomed_exp != obd->obd_self_export,
1657                          "self-export is hashed by NID?\n");
1658                 exports_evicted++;
1659                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1660                               "request\n", obd->obd_name,
1661                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1662                               obd_export_nid2str(doomed_exp));
1663                 class_fail_export(doomed_exp);
1664                 class_export_put(doomed_exp);
1665         } while (1);
1666
1667         cfs_hash_putref(nid_hash);
1668
1669         if (!exports_evicted)
1670                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1671                        obd->obd_name, nid);
1672         return exports_evicted;
1673 }
1674 EXPORT_SYMBOL(obd_export_evict_by_nid);
1675
1676 #ifdef HAVE_SERVER_SUPPORT
1677 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1678 {
1679         struct obd_export *doomed_exp = NULL;
1680         struct obd_uuid doomed_uuid;
1681         int exports_evicted = 0;
1682
1683         spin_lock(&obd->obd_dev_lock);
1684         if (obd->obd_stopping) {
1685                 spin_unlock(&obd->obd_dev_lock);
1686                 return exports_evicted;
1687         }
1688         spin_unlock(&obd->obd_dev_lock);
1689
1690         obd_str2uuid(&doomed_uuid, uuid);
1691         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1692                 CERROR("%s: can't evict myself\n", obd->obd_name);
1693                 return exports_evicted;
1694         }
1695
1696         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1697         if (doomed_exp == NULL) {
1698                 CERROR("%s: can't disconnect %s: no exports found\n",
1699                        obd->obd_name, uuid);
1700         } else {
1701                 CWARN("%s: evicting %s at adminstrative request\n",
1702                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1703                 class_fail_export(doomed_exp);
1704                 class_export_put(doomed_exp);
1705                 obd_uuid_del(obd, doomed_exp);
1706                 exports_evicted++;
1707         }
1708
1709         return exports_evicted;
1710 }
1711 #endif /* HAVE_SERVER_SUPPORT */
1712
1713 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1714 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1715 EXPORT_SYMBOL(class_export_dump_hook);
1716 #endif
1717
1718 static void print_export_data(struct obd_export *exp, const char *status,
1719                               int locks, int debug_level)
1720 {
1721         struct ptlrpc_reply_state *rs;
1722         struct ptlrpc_reply_state *first_reply = NULL;
1723         int nreplies = 0;
1724
1725         spin_lock(&exp->exp_lock);
1726         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1727                             rs_exp_list) {
1728                 if (nreplies == 0)
1729                         first_reply = rs;
1730                 nreplies++;
1731         }
1732         spin_unlock(&exp->exp_lock);
1733
1734         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1735                "%p %s %llu stale:%d\n",
1736                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1737                obd_export_nid2str(exp),
1738                refcount_read(&exp->exp_handle.h_ref),
1739                atomic_read(&exp->exp_rpc_count),
1740                atomic_read(&exp->exp_cb_count),
1741                atomic_read(&exp->exp_locks_count),
1742                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1743                nreplies, first_reply, nreplies > 3 ? "..." : "",
1744                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1745 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1746         if (locks && class_export_dump_hook != NULL)
1747                 class_export_dump_hook(exp);
1748 #endif
1749 }
1750
1751 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1752 {
1753         struct obd_export *exp;
1754
1755         spin_lock(&obd->obd_dev_lock);
1756         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1757                 print_export_data(exp, "ACTIVE", locks, debug_level);
1758         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1759                 print_export_data(exp, "UNLINKED", locks, debug_level);
1760         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1761                 print_export_data(exp, "DELAYED", locks, debug_level);
1762         spin_unlock(&obd->obd_dev_lock);
1763 }
1764
1765 void obd_exports_barrier(struct obd_device *obd)
1766 {
1767         int waited = 2;
1768         LASSERT(list_empty(&obd->obd_exports));
1769         spin_lock(&obd->obd_dev_lock);
1770         while (!list_empty(&obd->obd_unlinked_exports)) {
1771                 spin_unlock(&obd->obd_dev_lock);
1772                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1773                 if (waited > 5 && is_power_of_2(waited)) {
1774                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1775                                       "more than %d seconds. "
1776                                       "The obd refcount = %d. Is it stuck?\n",
1777                                       obd->obd_name, waited,
1778                                       atomic_read(&obd->obd_refcount));
1779                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1780                 }
1781                 waited *= 2;
1782                 spin_lock(&obd->obd_dev_lock);
1783         }
1784         spin_unlock(&obd->obd_dev_lock);
1785 }
1786 EXPORT_SYMBOL(obd_exports_barrier);
1787
1788 /**
1789  * Add export to the obd_zombe thread and notify it.
1790  */
1791 static void obd_zombie_export_add(struct obd_export *exp) {
1792         atomic_dec(&obd_stale_export_num);
1793         spin_lock(&exp->exp_obd->obd_dev_lock);
1794         LASSERT(!list_empty(&exp->exp_obd_chain));
1795         list_del_init(&exp->exp_obd_chain);
1796         spin_unlock(&exp->exp_obd->obd_dev_lock);
1797
1798         queue_work(zombie_wq, &exp->exp_zombie_work);
1799 }
1800
1801 /**
1802  * Add import to the obd_zombe thread and notify it.
1803  */
1804 static void obd_zombie_import_add(struct obd_import *imp) {
1805         LASSERT(imp->imp_sec == NULL);
1806
1807         queue_work(zombie_wq, &imp->imp_zombie_work);
1808 }
1809
1810 /**
1811  * wait when obd_zombie import/export queues become empty
1812  */
1813 void obd_zombie_barrier(void)
1814 {
1815         flush_workqueue(zombie_wq);
1816 }
1817 EXPORT_SYMBOL(obd_zombie_barrier);
1818
1819
1820 struct obd_export *obd_stale_export_get(void)
1821 {
1822         struct obd_export *exp = NULL;
1823         ENTRY;
1824
1825         spin_lock(&obd_stale_export_lock);
1826         if (!list_empty(&obd_stale_exports)) {
1827                 exp = list_first_entry(&obd_stale_exports,
1828                                        struct obd_export, exp_stale_list);
1829                 list_del_init(&exp->exp_stale_list);
1830         }
1831         spin_unlock(&obd_stale_export_lock);
1832
1833         if (exp) {
1834                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1835                        atomic_read(&obd_stale_export_num));
1836         }
1837         RETURN(exp);
1838 }
1839 EXPORT_SYMBOL(obd_stale_export_get);
1840
1841 void obd_stale_export_put(struct obd_export *exp)
1842 {
1843         ENTRY;
1844
1845         LASSERT(list_empty(&exp->exp_stale_list));
1846         if (exp->exp_lock_hash &&
1847             atomic_read(&exp->exp_lock_hash->hs_count)) {
1848                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1849                        atomic_read(&obd_stale_export_num));
1850
1851                 spin_lock_bh(&exp->exp_bl_list_lock);
1852                 spin_lock(&obd_stale_export_lock);
1853                 /* Add to the tail if there is no blocked locks,
1854                  * to the head otherwise. */
1855                 if (list_empty(&exp->exp_bl_list))
1856                         list_add_tail(&exp->exp_stale_list,
1857                                       &obd_stale_exports);
1858                 else
1859                         list_add(&exp->exp_stale_list,
1860                                  &obd_stale_exports);
1861
1862                 spin_unlock(&obd_stale_export_lock);
1863                 spin_unlock_bh(&exp->exp_bl_list_lock);
1864         } else {
1865                 class_export_put(exp);
1866         }
1867         EXIT;
1868 }
1869 EXPORT_SYMBOL(obd_stale_export_put);
1870
1871 /**
1872  * Adjust the position of the export in the stale list,
1873  * i.e. move to the head of the list if is needed.
1874  **/
1875 void obd_stale_export_adjust(struct obd_export *exp)
1876 {
1877         LASSERT(exp != NULL);
1878         spin_lock_bh(&exp->exp_bl_list_lock);
1879         spin_lock(&obd_stale_export_lock);
1880
1881         if (!list_empty(&exp->exp_stale_list) &&
1882             !list_empty(&exp->exp_bl_list))
1883                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1884
1885         spin_unlock(&obd_stale_export_lock);
1886         spin_unlock_bh(&exp->exp_bl_list_lock);
1887 }
1888 EXPORT_SYMBOL(obd_stale_export_adjust);
1889
1890 /**
1891  * start destroy zombie import/export thread
1892  */
1893 int obd_zombie_impexp_init(void)
1894 {
1895         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1896         if (!zombie_wq)
1897                 return -ENOMEM;
1898
1899         return 0;
1900 }
1901
1902 /**
1903  * stop destroy zombie import/export thread
1904  */
1905 void obd_zombie_impexp_stop(void)
1906 {
1907         destroy_workqueue(zombie_wq);
1908         LASSERT(list_empty(&obd_stale_exports));
1909 }
1910
1911 /***** Kernel-userspace comm helpers *******/
1912
1913 /* Get length of entire message, including header */
1914 int kuc_len(int payload_len)
1915 {
1916         return sizeof(struct kuc_hdr) + payload_len;
1917 }
1918 EXPORT_SYMBOL(kuc_len);
1919
1920 /* Get a pointer to kuc header, given a ptr to the payload
1921  * @param p Pointer to payload area
1922  * @returns Pointer to kuc header
1923  */
1924 struct kuc_hdr * kuc_ptr(void *p)
1925 {
1926         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1927         LASSERT(lh->kuc_magic == KUC_MAGIC);
1928         return lh;
1929 }
1930 EXPORT_SYMBOL(kuc_ptr);
1931
1932 /* Alloc space for a message, and fill in header
1933  * @return Pointer to payload area
1934  */
1935 void *kuc_alloc(int payload_len, int transport, int type)
1936 {
1937         struct kuc_hdr *lh;
1938         int len = kuc_len(payload_len);
1939
1940         OBD_ALLOC(lh, len);
1941         if (lh == NULL)
1942                 return ERR_PTR(-ENOMEM);
1943
1944         lh->kuc_magic = KUC_MAGIC;
1945         lh->kuc_transport = transport;
1946         lh->kuc_msgtype = type;
1947         lh->kuc_msglen = len;
1948
1949         return (void *)(lh + 1);
1950 }
1951 EXPORT_SYMBOL(kuc_alloc);
1952
1953 /* Takes pointer to payload area */
1954 void kuc_free(void *p, int payload_len)
1955 {
1956         struct kuc_hdr *lh = kuc_ptr(p);
1957         OBD_FREE(lh, kuc_len(payload_len));
1958 }
1959 EXPORT_SYMBOL(kuc_free);
1960
1961 struct obd_request_slot_waiter {
1962         struct list_head        orsw_entry;
1963         wait_queue_head_t       orsw_waitq;
1964         bool                    orsw_signaled;
1965 };
1966
1967 static bool obd_request_slot_avail(struct client_obd *cli,
1968                                    struct obd_request_slot_waiter *orsw)
1969 {
1970         bool avail;
1971
1972         spin_lock(&cli->cl_loi_list_lock);
1973         avail = !!list_empty(&orsw->orsw_entry);
1974         spin_unlock(&cli->cl_loi_list_lock);
1975
1976         return avail;
1977 };
1978
1979 /*
1980  * For network flow control, the RPC sponsor needs to acquire a credit
1981  * before sending the RPC. The credits count for a connection is defined
1982  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1983  * the subsequent RPC sponsors need to wait until others released their
1984  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1985  */
1986 int obd_get_request_slot(struct client_obd *cli)
1987 {
1988         struct obd_request_slot_waiter   orsw;
1989         int                              rc;
1990
1991         spin_lock(&cli->cl_loi_list_lock);
1992         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1993                 cli->cl_rpcs_in_flight++;
1994                 spin_unlock(&cli->cl_loi_list_lock);
1995                 return 0;
1996         }
1997
1998         init_waitqueue_head(&orsw.orsw_waitq);
1999         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2000         orsw.orsw_signaled = false;
2001         spin_unlock(&cli->cl_loi_list_lock);
2002
2003         rc = l_wait_event_abortable(orsw.orsw_waitq,
2004                                     obd_request_slot_avail(cli, &orsw) ||
2005                                     orsw.orsw_signaled);
2006
2007         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2008          * freed but other (such as obd_put_request_slot) is using it. */
2009         spin_lock(&cli->cl_loi_list_lock);
2010         if (rc != 0) {
2011                 if (!orsw.orsw_signaled) {
2012                         if (list_empty(&orsw.orsw_entry))
2013                                 cli->cl_rpcs_in_flight--;
2014                         else
2015                                 list_del(&orsw.orsw_entry);
2016                 }
2017                 rc = -EINTR;
2018         }
2019
2020         if (orsw.orsw_signaled) {
2021                 LASSERT(list_empty(&orsw.orsw_entry));
2022
2023                 rc = -EINTR;
2024         }
2025         spin_unlock(&cli->cl_loi_list_lock);
2026
2027         return rc;
2028 }
2029 EXPORT_SYMBOL(obd_get_request_slot);
2030
2031 void obd_put_request_slot(struct client_obd *cli)
2032 {
2033         struct obd_request_slot_waiter *orsw;
2034
2035         spin_lock(&cli->cl_loi_list_lock);
2036         cli->cl_rpcs_in_flight--;
2037
2038         /* If there is free slot, wakeup the first waiter. */
2039         if (!list_empty(&cli->cl_flight_waiters) &&
2040             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2041                 orsw = list_first_entry(&cli->cl_flight_waiters,
2042                                         struct obd_request_slot_waiter,
2043                                         orsw_entry);
2044                 list_del_init(&orsw->orsw_entry);
2045                 cli->cl_rpcs_in_flight++;
2046                 wake_up(&orsw->orsw_waitq);
2047         }
2048         spin_unlock(&cli->cl_loi_list_lock);
2049 }
2050 EXPORT_SYMBOL(obd_put_request_slot);
2051
2052 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2053 {
2054         return cli->cl_max_rpcs_in_flight;
2055 }
2056 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2057
2058 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2059 {
2060         struct obd_request_slot_waiter *orsw;
2061         __u32                           old;
2062         int                             diff;
2063         int                             i;
2064         const char *type_name;
2065         int                             rc;
2066
2067         if (max > OBD_MAX_RIF_MAX || max < 1)
2068                 return -ERANGE;
2069
2070         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2071         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2072                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2073                  * strictly lower that max_rpcs_in_flight */
2074                 if (max < 2) {
2075                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2076                                "because it must be higher than "
2077                                "max_mod_rpcs_in_flight value",
2078                                cli->cl_import->imp_obd->obd_name);
2079                         return -ERANGE;
2080                 }
2081                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2082                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2083                         if (rc != 0)
2084                                 return rc;
2085                 }
2086         }
2087
2088         spin_lock(&cli->cl_loi_list_lock);
2089         old = cli->cl_max_rpcs_in_flight;
2090         cli->cl_max_rpcs_in_flight = max;
2091         client_adjust_max_dirty(cli);
2092
2093         diff = max - old;
2094
2095         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2096         for (i = 0; i < diff; i++) {
2097                 if (list_empty(&cli->cl_flight_waiters))
2098                         break;
2099
2100                 orsw = list_first_entry(&cli->cl_flight_waiters,
2101                                         struct obd_request_slot_waiter,
2102                                         orsw_entry);
2103                 list_del_init(&orsw->orsw_entry);
2104                 cli->cl_rpcs_in_flight++;
2105                 wake_up(&orsw->orsw_waitq);
2106         }
2107         spin_unlock(&cli->cl_loi_list_lock);
2108
2109         return 0;
2110 }
2111 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2112
2113 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2114 {
2115         return cli->cl_max_mod_rpcs_in_flight;
2116 }
2117 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2118
2119 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2120 {
2121         struct obd_connect_data *ocd;
2122         __u16 maxmodrpcs;
2123         __u16 prev;
2124
2125         if (max > OBD_MAX_RIF_MAX || max < 1)
2126                 return -ERANGE;
2127
2128         /* cannot exceed or equal max_rpcs_in_flight */
2129         if (max >= cli->cl_max_rpcs_in_flight) {
2130                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2131                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2132                        cli->cl_import->imp_obd->obd_name,
2133                        max, cli->cl_max_rpcs_in_flight);
2134                 return -ERANGE;
2135         }
2136
2137         /* cannot exceed max modify RPCs in flight supported by the server */
2138         ocd = &cli->cl_import->imp_connect_data;
2139         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2140                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2141         else
2142                 maxmodrpcs = 1;
2143         if (max > maxmodrpcs) {
2144                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2145                        "higher than max_mod_rpcs_per_client value (%hu) "
2146                        "returned by the server at connection\n",
2147                        cli->cl_import->imp_obd->obd_name,
2148                        max, maxmodrpcs);
2149                 return -ERANGE;
2150         }
2151
2152         spin_lock(&cli->cl_mod_rpcs_lock);
2153
2154         prev = cli->cl_max_mod_rpcs_in_flight;
2155         cli->cl_max_mod_rpcs_in_flight = max;
2156
2157         /* wakeup waiters if limit has been increased */
2158         if (cli->cl_max_mod_rpcs_in_flight > prev)
2159                 wake_up(&cli->cl_mod_rpcs_waitq);
2160
2161         spin_unlock(&cli->cl_mod_rpcs_lock);
2162
2163         return 0;
2164 }
2165 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2166
2167 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2168                                struct seq_file *seq)
2169 {
2170         unsigned long mod_tot = 0, mod_cum;
2171         struct timespec64 now;
2172         int i;
2173
2174         ktime_get_real_ts64(&now);
2175
2176         spin_lock(&cli->cl_mod_rpcs_lock);
2177
2178         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2179                    (s64)now.tv_sec, now.tv_nsec);
2180         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2181                    cli->cl_mod_rpcs_in_flight);
2182
2183         seq_printf(seq, "\n\t\t\tmodify\n");
2184         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2185
2186         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2187
2188         mod_cum = 0;
2189         for (i = 0; i < OBD_HIST_MAX; i++) {
2190                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2191                 mod_cum += mod;
2192                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2193                            i, mod, pct(mod, mod_tot),
2194                            pct(mod_cum, mod_tot));
2195                 if (mod_cum == mod_tot)
2196                         break;
2197         }
2198
2199         spin_unlock(&cli->cl_mod_rpcs_lock);
2200
2201         return 0;
2202 }
2203 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2204
2205 /* The number of modify RPCs sent in parallel is limited
2206  * because the server has a finite number of slots per client to
2207  * store request result and ensure reply reconstruction when needed.
2208  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2209  * that takes into account server limit and cl_max_rpcs_in_flight
2210  * value.
2211  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2212  * one close request is allowed above the maximum.
2213  */
2214 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2215                                                  bool close_req)
2216 {
2217         bool avail;
2218
2219         /* A slot is available if
2220          * - number of modify RPCs in flight is less than the max
2221          * - it's a close RPC and no other close request is in flight
2222          */
2223         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2224                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2225
2226         return avail;
2227 }
2228
2229 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2230                                          bool close_req)
2231 {
2232         bool avail;
2233
2234         spin_lock(&cli->cl_mod_rpcs_lock);
2235         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2236         spin_unlock(&cli->cl_mod_rpcs_lock);
2237         return avail;
2238 }
2239
2240
2241 /* Get a modify RPC slot from the obd client @cli according
2242  * to the kind of operation @opc that is going to be sent
2243  * and the intent @it of the operation if it applies.
2244  * If the maximum number of modify RPCs in flight is reached
2245  * the thread is put to sleep.
2246  * Returns the tag to be set in the request message. Tag 0
2247  * is reserved for non-modifying requests.
2248  */
2249 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2250 {
2251         bool                    close_req = false;
2252         __u16                   i, max;
2253
2254         if (opc == MDS_CLOSE)
2255                 close_req = true;
2256
2257         do {
2258                 spin_lock(&cli->cl_mod_rpcs_lock);
2259                 max = cli->cl_max_mod_rpcs_in_flight;
2260                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2261                         /* there is a slot available */
2262                         cli->cl_mod_rpcs_in_flight++;
2263                         if (close_req)
2264                                 cli->cl_close_rpcs_in_flight++;
2265                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2266                                          cli->cl_mod_rpcs_in_flight);
2267                         /* find a free tag */
2268                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2269                                                 max + 1);
2270                         LASSERT(i < OBD_MAX_RIF_MAX);
2271                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2272                         spin_unlock(&cli->cl_mod_rpcs_lock);
2273                         /* tag 0 is reserved for non-modify RPCs */
2274
2275                         CDEBUG(D_RPCTRACE,
2276                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2277                                cli->cl_import->imp_obd->obd_name,
2278                                i + 1, opc, max);
2279
2280                         return i + 1;
2281                 }
2282                 spin_unlock(&cli->cl_mod_rpcs_lock);
2283
2284                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2285                        "opc %u, max %hu\n",
2286                        cli->cl_import->imp_obd->obd_name, opc, max);
2287
2288                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2289                                           obd_mod_rpc_slot_avail(cli,
2290                                                                  close_req));
2291         } while (true);
2292 }
2293 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2294
2295 /* Put a modify RPC slot from the obd client @cli according
2296  * to the kind of operation @opc that has been sent.
2297  */
2298 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2299 {
2300         bool                    close_req = false;
2301
2302         if (tag == 0)
2303                 return;
2304
2305         if (opc == MDS_CLOSE)
2306                 close_req = true;
2307
2308         spin_lock(&cli->cl_mod_rpcs_lock);
2309         cli->cl_mod_rpcs_in_flight--;
2310         if (close_req)
2311                 cli->cl_close_rpcs_in_flight--;
2312         /* release the tag in the bitmap */
2313         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2314         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2315         spin_unlock(&cli->cl_mod_rpcs_lock);
2316         wake_up(&cli->cl_mod_rpcs_waitq);
2317 }
2318 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2319