Whamcloud - gitweb
LU-13600 ptlrpc: limit rate of lock replays
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         type->typ_debugfs_entry = symlink;
208         type->typ_sym_filter = true;
209
210         if (enable_proc) {
211                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
212                                                       NULL, NULL);
213                 if (IS_ERR(type->typ_procroot)) {
214                         CERROR("%s: can't create compat proc entry: %d\n",
215                                name, (int)PTR_ERR(type->typ_procroot));
216                         type->typ_procroot = NULL;
217                 }
218         }
219
220         return type;
221 }
222 EXPORT_SYMBOL(class_add_symlinks);
223 #endif /* HAVE_SERVER_SUPPORT */
224
225 #define CLASS_MAX_NAME 1024
226
227 int class_register_type(const struct obd_ops *dt_ops,
228                         const struct md_ops *md_ops,
229                         bool enable_proc, struct lprocfs_vars *vars,
230                         const char *name, struct lu_device_type *ldt)
231 {
232         struct obd_type *type;
233         int rc;
234
235         ENTRY;
236         /* sanity check */
237         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
238
239         type = class_search_type(name);
240         if (type) {
241 #ifdef HAVE_SERVER_SUPPORT
242                 if (type->typ_sym_filter)
243                         goto dir_exist;
244 #endif /* HAVE_SERVER_SUPPORT */
245                 kobject_put(&type->typ_kobj);
246                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
247                 RETURN(-EEXIST);
248         }
249
250         OBD_ALLOC(type, sizeof(*type));
251         if (type == NULL)
252                 RETURN(-ENOMEM);
253
254         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
255         type->typ_kobj.kset = lustre_kset;
256         kobject_init(&type->typ_kobj, &class_ktype);
257 #ifdef HAVE_SERVER_SUPPORT
258 dir_exist:
259 #endif /* HAVE_SERVER_SUPPORT */
260
261         type->typ_dt_ops = dt_ops;
262         type->typ_md_ops = md_ops;
263
264 #ifdef HAVE_SERVER_SUPPORT
265         if (type->typ_sym_filter) {
266                 type->typ_sym_filter = false;
267                 kobject_put(&type->typ_kobj);
268                 goto setup_ldt;
269         }
270 #endif
271 #ifdef CONFIG_PROC_FS
272         if (enable_proc && !type->typ_procroot) {
273                 type->typ_procroot = lprocfs_register(name,
274                                                       proc_lustre_root,
275                                                       NULL, type);
276                 if (IS_ERR(type->typ_procroot)) {
277                         rc = PTR_ERR(type->typ_procroot);
278                         type->typ_procroot = NULL;
279                         GOTO(failed, rc);
280                 }
281         }
282 #endif
283         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
284         ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
285
286         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
287         if (rc)
288                 GOTO(failed, rc);
289 #ifdef HAVE_SERVER_SUPPORT
290 setup_ldt:
291 #endif
292         if (ldt) {
293                 rc = lu_device_type_init(ldt);
294                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
295                 wake_up_var(&type->typ_lu);
296                 if (rc)
297                         GOTO(failed, rc);
298         }
299
300         RETURN(0);
301
302 failed:
303         kobject_put(&type->typ_kobj);
304
305         RETURN(rc);
306 }
307 EXPORT_SYMBOL(class_register_type);
308
309 int class_unregister_type(const char *name)
310 {
311         struct obd_type *type = class_search_type(name);
312         int rc = 0;
313         ENTRY;
314
315         if (!type) {
316                 CERROR("unknown obd type\n");
317                 RETURN(-EINVAL);
318         }
319
320         if (atomic_read(&type->typ_refcnt)) {
321                 CERROR("type %s has refcount (%d)\n", name,
322                        atomic_read(&type->typ_refcnt));
323                 /* This is a bad situation, let's make the best of it */
324                 /* Remove ops, but leave the name for debugging */
325                 type->typ_dt_ops = NULL;
326                 type->typ_md_ops = NULL;
327                 GOTO(out_put, rc = -EBUSY);
328         }
329
330         /* Put the final ref */
331         kobject_put(&type->typ_kobj);
332 out_put:
333         /* Put the ref returned by class_search_type() */
334         kobject_put(&type->typ_kobj);
335
336         RETURN(rc);
337 } /* class_unregister_type */
338 EXPORT_SYMBOL(class_unregister_type);
339
340 /**
341  * Create a new obd device.
342  *
343  * Allocate the new obd_device and initialize it.
344  *
345  * \param[in] type_name obd device type string.
346  * \param[in] name      obd device name.
347  * \param[in] uuid      obd device UUID
348  *
349  * \retval newdev         pointer to created obd_device
350  * \retval ERR_PTR(errno) on error
351  */
352 struct obd_device *class_newdev(const char *type_name, const char *name,
353                                 const char *uuid)
354 {
355         struct obd_device *newdev;
356         struct obd_type *type = NULL;
357         ENTRY;
358
359         if (strlen(name) >= MAX_OBD_NAME) {
360                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
361                 RETURN(ERR_PTR(-EINVAL));
362         }
363
364         type = class_get_type(type_name);
365         if (type == NULL){
366                 CERROR("OBD: unknown type: %s\n", type_name);
367                 RETURN(ERR_PTR(-ENODEV));
368         }
369
370         newdev = obd_device_alloc();
371         if (newdev == NULL) {
372                 class_put_type(type);
373                 RETURN(ERR_PTR(-ENOMEM));
374         }
375         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
376         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
377         newdev->obd_type = type;
378         newdev->obd_minor = -1;
379
380         rwlock_init(&newdev->obd_pool_lock);
381         newdev->obd_pool_limit = 0;
382         newdev->obd_pool_slv = 0;
383
384         INIT_LIST_HEAD(&newdev->obd_exports);
385         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
386         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
387         INIT_LIST_HEAD(&newdev->obd_exports_timed);
388         INIT_LIST_HEAD(&newdev->obd_nid_stats);
389         spin_lock_init(&newdev->obd_nid_lock);
390         spin_lock_init(&newdev->obd_dev_lock);
391         mutex_init(&newdev->obd_dev_mutex);
392         spin_lock_init(&newdev->obd_osfs_lock);
393         /* newdev->obd_osfs_age must be set to a value in the distant
394          * past to guarantee a fresh statfs is fetched on mount. */
395         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
396
397         /* XXX belongs in setup not attach  */
398         init_rwsem(&newdev->obd_observer_link_sem);
399         /* recovery data */
400         spin_lock_init(&newdev->obd_recovery_task_lock);
401         init_waitqueue_head(&newdev->obd_next_transno_waitq);
402         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
403         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
404         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
405         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
406         INIT_LIST_HEAD(&newdev->obd_evict_list);
407         INIT_LIST_HEAD(&newdev->obd_lwp_list);
408
409         llog_group_init(&newdev->obd_olg);
410         /* Detach drops this */
411         atomic_set(&newdev->obd_refcount, 1);
412         lu_ref_init(&newdev->obd_reference);
413         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
414
415         newdev->obd_conn_inprogress = 0;
416
417         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
418
419         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
420                newdev->obd_name, newdev);
421
422         return newdev;
423 }
424
425 /**
426  * Free obd device.
427  *
428  * \param[in] obd obd_device to be freed
429  *
430  * \retval none
431  */
432 void class_free_dev(struct obd_device *obd)
433 {
434         struct obd_type *obd_type = obd->obd_type;
435
436         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
437                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
438         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
439                  "obd %p != obd_devs[%d] %p\n",
440                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
441         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
442                  "obd_refcount should be 0, not %d\n",
443                  atomic_read(&obd->obd_refcount));
444         LASSERT(obd_type != NULL);
445
446         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
447                obd->obd_name, obd->obd_type->typ_name);
448
449         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
450                          obd->obd_name, obd->obd_uuid.uuid);
451         if (obd->obd_stopping) {
452                 int err;
453
454                 /* If we're not stopping, we were never set up */
455                 err = obd_cleanup(obd);
456                 if (err)
457                         CERROR("Cleanup %s returned %d\n",
458                                 obd->obd_name, err);
459         }
460
461         obd_device_free(obd);
462
463         class_put_type(obd_type);
464 }
465
466 /**
467  * Unregister obd device.
468  *
469  * Free slot in obd_dev[] used by \a obd.
470  *
471  * \param[in] new_obd obd_device to be unregistered
472  *
473  * \retval none
474  */
475 void class_unregister_device(struct obd_device *obd)
476 {
477         write_lock(&obd_dev_lock);
478         if (obd->obd_minor >= 0) {
479                 LASSERT(obd_devs[obd->obd_minor] == obd);
480                 obd_devs[obd->obd_minor] = NULL;
481                 obd->obd_minor = -1;
482         }
483         write_unlock(&obd_dev_lock);
484 }
485
486 /**
487  * Register obd device.
488  *
489  * Find free slot in obd_devs[], fills it with \a new_obd.
490  *
491  * \param[in] new_obd obd_device to be registered
492  *
493  * \retval 0          success
494  * \retval -EEXIST    device with this name is registered
495  * \retval -EOVERFLOW obd_devs[] is full
496  */
497 int class_register_device(struct obd_device *new_obd)
498 {
499         int ret = 0;
500         int i;
501         int new_obd_minor = 0;
502         bool minor_assign = false;
503         bool retried = false;
504
505 again:
506         write_lock(&obd_dev_lock);
507         for (i = 0; i < class_devno_max(); i++) {
508                 struct obd_device *obd = class_num2obd(i);
509
510                 if (obd != NULL &&
511                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
512
513                         if (!retried) {
514                                 write_unlock(&obd_dev_lock);
515
516                                 /* the obd_device could be waited to be
517                                  * destroyed by the "obd_zombie_impexp_thread".
518                                  */
519                                 obd_zombie_barrier();
520                                 retried = true;
521                                 goto again;
522                         }
523
524                         CERROR("%s: already exists, won't add\n",
525                                obd->obd_name);
526                         /* in case we found a free slot before duplicate */
527                         minor_assign = false;
528                         ret = -EEXIST;
529                         break;
530                 }
531                 if (!minor_assign && obd == NULL) {
532                         new_obd_minor = i;
533                         minor_assign = true;
534                 }
535         }
536
537         if (minor_assign) {
538                 new_obd->obd_minor = new_obd_minor;
539                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
540                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
541                 obd_devs[new_obd_minor] = new_obd;
542         } else {
543                 if (ret == 0) {
544                         ret = -EOVERFLOW;
545                         CERROR("%s: all %u/%u devices used, increase "
546                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
547                                i, class_devno_max(), ret);
548                 }
549         }
550         write_unlock(&obd_dev_lock);
551
552         RETURN(ret);
553 }
554
555 static int class_name2dev_nolock(const char *name)
556 {
557         int i;
558
559         if (!name)
560                 return -1;
561
562         for (i = 0; i < class_devno_max(); i++) {
563                 struct obd_device *obd = class_num2obd(i);
564
565                 if (obd && strcmp(name, obd->obd_name) == 0) {
566                         /* Make sure we finished attaching before we give
567                            out any references */
568                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
569                         if (obd->obd_attached) {
570                                 return i;
571                         }
572                         break;
573                 }
574         }
575
576         return -1;
577 }
578
579 int class_name2dev(const char *name)
580 {
581         int i;
582
583         if (!name)
584                 return -1;
585
586         read_lock(&obd_dev_lock);
587         i = class_name2dev_nolock(name);
588         read_unlock(&obd_dev_lock);
589
590         return i;
591 }
592 EXPORT_SYMBOL(class_name2dev);
593
594 struct obd_device *class_name2obd(const char *name)
595 {
596         int dev = class_name2dev(name);
597
598         if (dev < 0 || dev > class_devno_max())
599                 return NULL;
600         return class_num2obd(dev);
601 }
602 EXPORT_SYMBOL(class_name2obd);
603
604 int class_uuid2dev_nolock(struct obd_uuid *uuid)
605 {
606         int i;
607
608         for (i = 0; i < class_devno_max(); i++) {
609                 struct obd_device *obd = class_num2obd(i);
610
611                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
612                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
613                         return i;
614                 }
615         }
616
617         return -1;
618 }
619
620 int class_uuid2dev(struct obd_uuid *uuid)
621 {
622         int i;
623
624         read_lock(&obd_dev_lock);
625         i = class_uuid2dev_nolock(uuid);
626         read_unlock(&obd_dev_lock);
627
628         return i;
629 }
630 EXPORT_SYMBOL(class_uuid2dev);
631
632 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
633 {
634         int dev = class_uuid2dev(uuid);
635         if (dev < 0)
636                 return NULL;
637         return class_num2obd(dev);
638 }
639 EXPORT_SYMBOL(class_uuid2obd);
640
641 /**
642  * Get obd device from ::obd_devs[]
643  *
644  * \param num [in] array index
645  *
646  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
647  *         otherwise return the obd device there.
648  */
649 struct obd_device *class_num2obd(int num)
650 {
651         struct obd_device *obd = NULL;
652
653         if (num < class_devno_max()) {
654                 obd = obd_devs[num];
655                 if (obd == NULL)
656                         return NULL;
657
658                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
659                          "%p obd_magic %08x != %08x\n",
660                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
661                 LASSERTF(obd->obd_minor == num,
662                          "%p obd_minor %0d != %0d\n",
663                          obd, obd->obd_minor, num);
664         }
665
666         return obd;
667 }
668 EXPORT_SYMBOL(class_num2obd);
669
670 /**
671  * Find obd in obd_dev[] by name or uuid.
672  *
673  * Increment obd's refcount if found.
674  *
675  * \param[in] str obd name or uuid
676  *
677  * \retval NULL    if not found
678  * \retval target  pointer to found obd_device
679  */
680 struct obd_device *class_dev_by_str(const char *str)
681 {
682         struct obd_device *target = NULL;
683         struct obd_uuid tgtuuid;
684         int rc;
685
686         obd_str2uuid(&tgtuuid, str);
687
688         read_lock(&obd_dev_lock);
689         rc = class_uuid2dev_nolock(&tgtuuid);
690         if (rc < 0)
691                 rc = class_name2dev_nolock(str);
692
693         if (rc >= 0)
694                 target = class_num2obd(rc);
695
696         if (target != NULL)
697                 class_incref(target, "find", current);
698         read_unlock(&obd_dev_lock);
699
700         RETURN(target);
701 }
702 EXPORT_SYMBOL(class_dev_by_str);
703
704 /**
705  * Get obd devices count. Device in any
706  *    state are counted
707  * \retval obd device count
708  */
709 int get_devices_count(void)
710 {
711         int index, max_index = class_devno_max(), dev_count = 0;
712
713         read_lock(&obd_dev_lock);
714         for (index = 0; index <= max_index; index++) {
715                 struct obd_device *obd = class_num2obd(index);
716                 if (obd != NULL)
717                         dev_count++;
718         }
719         read_unlock(&obd_dev_lock);
720
721         return dev_count;
722 }
723 EXPORT_SYMBOL(get_devices_count);
724
725 void class_obd_list(void)
726 {
727         char *status;
728         int i;
729
730         read_lock(&obd_dev_lock);
731         for (i = 0; i < class_devno_max(); i++) {
732                 struct obd_device *obd = class_num2obd(i);
733
734                 if (obd == NULL)
735                         continue;
736                 if (obd->obd_stopping)
737                         status = "ST";
738                 else if (obd->obd_set_up)
739                         status = "UP";
740                 else if (obd->obd_attached)
741                         status = "AT";
742                 else
743                         status = "--";
744                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
745                          i, status, obd->obd_type->typ_name,
746                          obd->obd_name, obd->obd_uuid.uuid,
747                          atomic_read(&obd->obd_refcount));
748         }
749         read_unlock(&obd_dev_lock);
750 }
751
752 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
753  * specified, then only the client with that uuid is returned,
754  * otherwise any client connected to the tgt is returned.
755  */
756 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
757                                          const char *type_name,
758                                          struct obd_uuid *grp_uuid)
759 {
760         int i;
761
762         read_lock(&obd_dev_lock);
763         for (i = 0; i < class_devno_max(); i++) {
764                 struct obd_device *obd = class_num2obd(i);
765
766                 if (obd == NULL)
767                         continue;
768                 if ((strncmp(obd->obd_type->typ_name, type_name,
769                              strlen(type_name)) == 0)) {
770                         if (obd_uuid_equals(tgt_uuid,
771                                             &obd->u.cli.cl_target_uuid) &&
772                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
773                                                          &obd->obd_uuid) : 1)) {
774                                 read_unlock(&obd_dev_lock);
775                                 return obd;
776                         }
777                 }
778         }
779         read_unlock(&obd_dev_lock);
780
781         return NULL;
782 }
783 EXPORT_SYMBOL(class_find_client_obd);
784
785 /* Iterate the obd_device list looking devices have grp_uuid. Start
786  * searching at *next, and if a device is found, the next index to look
787  * at is saved in *next. If next is NULL, then the first matching device
788  * will always be returned.
789  */
790 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
791 {
792         int i;
793
794         if (next == NULL)
795                 i = 0;
796         else if (*next >= 0 && *next < class_devno_max())
797                 i = *next;
798         else
799                 return NULL;
800
801         read_lock(&obd_dev_lock);
802         for (; i < class_devno_max(); i++) {
803                 struct obd_device *obd = class_num2obd(i);
804
805                 if (obd == NULL)
806                         continue;
807                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
808                         if (next != NULL)
809                                 *next = i+1;
810                         read_unlock(&obd_dev_lock);
811                         return obd;
812                 }
813         }
814         read_unlock(&obd_dev_lock);
815
816         return NULL;
817 }
818 EXPORT_SYMBOL(class_devices_in_group);
819
820 /**
821  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
822  * adjust sptlrpc settings accordingly.
823  */
824 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
825 {
826         struct obd_device  *obd;
827         const char         *type;
828         int                 i, rc = 0, rc2;
829
830         LASSERT(namelen > 0);
831
832         read_lock(&obd_dev_lock);
833         for (i = 0; i < class_devno_max(); i++) {
834                 obd = class_num2obd(i);
835
836                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
837                         continue;
838
839                 /* only notify mdc, osc, osp, lwp, mdt, ost
840                  * because only these have a -sptlrpc llog */
841                 type = obd->obd_type->typ_name;
842                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
843                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
844                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
845                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
846                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
847                     strcmp(type, LUSTRE_OST_NAME) != 0)
848                         continue;
849
850                 if (strncmp(obd->obd_name, fsname, namelen))
851                         continue;
852
853                 class_incref(obd, __FUNCTION__, obd);
854                 read_unlock(&obd_dev_lock);
855                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
856                                          sizeof(KEY_SPTLRPC_CONF),
857                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
858                 rc = rc ? rc : rc2;
859                 class_decref(obd, __FUNCTION__, obd);
860                 read_lock(&obd_dev_lock);
861         }
862         read_unlock(&obd_dev_lock);
863         return rc;
864 }
865 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
866
867 void obd_cleanup_caches(void)
868 {
869         ENTRY;
870         if (obd_device_cachep) {
871                 kmem_cache_destroy(obd_device_cachep);
872                 obd_device_cachep = NULL;
873         }
874
875         EXIT;
876 }
877
878 int obd_init_caches(void)
879 {
880         int rc;
881         ENTRY;
882
883         LASSERT(obd_device_cachep == NULL);
884         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
885                                 sizeof(struct obd_device),
886                                 0, 0, 0, sizeof(struct obd_device), NULL);
887         if (!obd_device_cachep)
888                 GOTO(out, rc = -ENOMEM);
889
890         RETURN(0);
891 out:
892         obd_cleanup_caches();
893         RETURN(rc);
894 }
895
896 static const char export_handle_owner[] = "export";
897
898 /* map connection to client */
899 struct obd_export *class_conn2export(struct lustre_handle *conn)
900 {
901         struct obd_export *export;
902         ENTRY;
903
904         if (!conn) {
905                 CDEBUG(D_CACHE, "looking for null handle\n");
906                 RETURN(NULL);
907         }
908
909         if (conn->cookie == -1) {  /* this means assign a new connection */
910                 CDEBUG(D_CACHE, "want a new connection\n");
911                 RETURN(NULL);
912         }
913
914         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
915         export = class_handle2object(conn->cookie, export_handle_owner);
916         RETURN(export);
917 }
918 EXPORT_SYMBOL(class_conn2export);
919
920 struct obd_device *class_exp2obd(struct obd_export *exp)
921 {
922         if (exp)
923                 return exp->exp_obd;
924         return NULL;
925 }
926 EXPORT_SYMBOL(class_exp2obd);
927
928 struct obd_import *class_exp2cliimp(struct obd_export *exp)
929 {
930         struct obd_device *obd = exp->exp_obd;
931         if (obd == NULL)
932                 return NULL;
933         return obd->u.cli.cl_import;
934 }
935 EXPORT_SYMBOL(class_exp2cliimp);
936
937 /* Export management functions */
938 static void class_export_destroy(struct obd_export *exp)
939 {
940         struct obd_device *obd = exp->exp_obd;
941         ENTRY;
942
943         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
944         LASSERT(obd != NULL);
945
946         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
947                exp->exp_client_uuid.uuid, obd->obd_name);
948
949         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
950         if (exp->exp_connection)
951                 ptlrpc_put_connection_superhack(exp->exp_connection);
952
953         LASSERT(list_empty(&exp->exp_outstanding_replies));
954         LASSERT(list_empty(&exp->exp_uncommitted_replies));
955         LASSERT(list_empty(&exp->exp_req_replay_queue));
956         LASSERT(list_empty(&exp->exp_hp_rpcs));
957         obd_destroy_export(exp);
958         /* self export doesn't hold a reference to an obd, although it
959          * exists until freeing of the obd */
960         if (exp != obd->obd_self_export)
961                 class_decref(obd, "export", exp);
962
963         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
964         kfree_rcu(exp, exp_handle.h_rcu);
965         EXIT;
966 }
967
968 struct obd_export *class_export_get(struct obd_export *exp)
969 {
970         refcount_inc(&exp->exp_handle.h_ref);
971         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
972                refcount_read(&exp->exp_handle.h_ref));
973         return exp;
974 }
975 EXPORT_SYMBOL(class_export_get);
976
977 void class_export_put(struct obd_export *exp)
978 {
979         LASSERT(exp != NULL);
980         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
981         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
982         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
983                refcount_read(&exp->exp_handle.h_ref) - 1);
984
985         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
986                 struct obd_device *obd = exp->exp_obd;
987
988                 CDEBUG(D_IOCTL, "final put %p/%s\n",
989                        exp, exp->exp_client_uuid.uuid);
990
991                 /* release nid stat refererence */
992                 lprocfs_exp_cleanup(exp);
993
994                 if (exp == obd->obd_self_export) {
995                         /* self export should be destroyed without
996                          * zombie thread as it doesn't hold a
997                          * reference to obd and doesn't hold any
998                          * resources */
999                         class_export_destroy(exp);
1000                         /* self export is destroyed, no class
1001                          * references exist and it is safe to free
1002                          * obd */
1003                         class_free_dev(obd);
1004                 } else {
1005                         LASSERT(!list_empty(&exp->exp_obd_chain));
1006                         obd_zombie_export_add(exp);
1007                 }
1008
1009         }
1010 }
1011 EXPORT_SYMBOL(class_export_put);
1012
1013 static void obd_zombie_exp_cull(struct work_struct *ws)
1014 {
1015         struct obd_export *export;
1016
1017         export = container_of(ws, struct obd_export, exp_zombie_work);
1018         class_export_destroy(export);
1019 }
1020
1021 /* Creates a new export, adds it to the hash table, and returns a
1022  * pointer to it. The refcount is 2: one for the hash reference, and
1023  * one for the pointer returned by this function. */
1024 struct obd_export *__class_new_export(struct obd_device *obd,
1025                                       struct obd_uuid *cluuid, bool is_self)
1026 {
1027         struct obd_export *export;
1028         int rc = 0;
1029         ENTRY;
1030
1031         OBD_ALLOC_PTR(export);
1032         if (!export)
1033                 return ERR_PTR(-ENOMEM);
1034
1035         export->exp_conn_cnt = 0;
1036         export->exp_lock_hash = NULL;
1037         export->exp_flock_hash = NULL;
1038         /* 2 = class_handle_hash + last */
1039         refcount_set(&export->exp_handle.h_ref, 2);
1040         atomic_set(&export->exp_rpc_count, 0);
1041         atomic_set(&export->exp_cb_count, 0);
1042         atomic_set(&export->exp_locks_count, 0);
1043 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1044         INIT_LIST_HEAD(&export->exp_locks_list);
1045         spin_lock_init(&export->exp_locks_list_guard);
1046 #endif
1047         atomic_set(&export->exp_replay_count, 0);
1048         export->exp_obd = obd;
1049         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1050         spin_lock_init(&export->exp_uncommitted_replies_lock);
1051         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1052         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1053         INIT_HLIST_NODE(&export->exp_handle.h_link);
1054         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1055         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1056         class_handle_hash(&export->exp_handle, export_handle_owner);
1057         export->exp_last_request_time = ktime_get_real_seconds();
1058         spin_lock_init(&export->exp_lock);
1059         spin_lock_init(&export->exp_rpc_lock);
1060         INIT_HLIST_NODE(&export->exp_gen_hash);
1061         spin_lock_init(&export->exp_bl_list_lock);
1062         INIT_LIST_HEAD(&export->exp_bl_list);
1063         INIT_LIST_HEAD(&export->exp_stale_list);
1064         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1065
1066         export->exp_sp_peer = LUSTRE_SP_ANY;
1067         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1068         export->exp_client_uuid = *cluuid;
1069         obd_init_export(export);
1070
1071         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1072
1073         spin_lock(&obd->obd_dev_lock);
1074         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1075                 /* shouldn't happen, but might race */
1076                 if (obd->obd_stopping)
1077                         GOTO(exit_unlock, rc = -ENODEV);
1078
1079                 rc = obd_uuid_add(obd, export);
1080                 if (rc != 0) {
1081                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1082                                       obd->obd_name, cluuid->uuid, rc);
1083                         GOTO(exit_unlock, rc = -EALREADY);
1084                 }
1085         }
1086
1087         if (!is_self) {
1088                 class_incref(obd, "export", export);
1089                 list_add_tail(&export->exp_obd_chain_timed,
1090                               &obd->obd_exports_timed);
1091                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1092                 obd->obd_num_exports++;
1093         } else {
1094                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1095                 INIT_LIST_HEAD(&export->exp_obd_chain);
1096         }
1097         spin_unlock(&obd->obd_dev_lock);
1098         RETURN(export);
1099
1100 exit_unlock:
1101         spin_unlock(&obd->obd_dev_lock);
1102         class_handle_unhash(&export->exp_handle);
1103         obd_destroy_export(export);
1104         OBD_FREE_PTR(export);
1105         return ERR_PTR(rc);
1106 }
1107
1108 struct obd_export *class_new_export(struct obd_device *obd,
1109                                     struct obd_uuid *uuid)
1110 {
1111         return __class_new_export(obd, uuid, false);
1112 }
1113 EXPORT_SYMBOL(class_new_export);
1114
1115 struct obd_export *class_new_export_self(struct obd_device *obd,
1116                                          struct obd_uuid *uuid)
1117 {
1118         return __class_new_export(obd, uuid, true);
1119 }
1120
1121 void class_unlink_export(struct obd_export *exp)
1122 {
1123         class_handle_unhash(&exp->exp_handle);
1124
1125         if (exp->exp_obd->obd_self_export == exp) {
1126                 class_export_put(exp);
1127                 return;
1128         }
1129
1130         spin_lock(&exp->exp_obd->obd_dev_lock);
1131         /* delete an uuid-export hashitem from hashtables */
1132         if (exp != exp->exp_obd->obd_self_export)
1133                 obd_uuid_del(exp->exp_obd, exp);
1134
1135 #ifdef HAVE_SERVER_SUPPORT
1136         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1137                 struct tg_export_data   *ted = &exp->exp_target_data;
1138                 struct cfs_hash         *hash;
1139
1140                 /* Because obd_gen_hash will not be released until
1141                  * class_cleanup(), so hash should never be NULL here */
1142                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1143                 LASSERT(hash != NULL);
1144                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1145                              &exp->exp_gen_hash);
1146                 cfs_hash_putref(hash);
1147         }
1148 #endif /* HAVE_SERVER_SUPPORT */
1149
1150         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1151         list_del_init(&exp->exp_obd_chain_timed);
1152         exp->exp_obd->obd_num_exports--;
1153         spin_unlock(&exp->exp_obd->obd_dev_lock);
1154         atomic_inc(&obd_stale_export_num);
1155
1156         /* A reference is kept by obd_stale_exports list */
1157         obd_stale_export_put(exp);
1158 }
1159 EXPORT_SYMBOL(class_unlink_export);
1160
1161 /* Import management functions */
1162 static void obd_zombie_import_free(struct obd_import *imp)
1163 {
1164         ENTRY;
1165
1166         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1167                 imp->imp_obd->obd_name);
1168
1169         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1170
1171         ptlrpc_put_connection_superhack(imp->imp_connection);
1172
1173         while (!list_empty(&imp->imp_conn_list)) {
1174                 struct obd_import_conn *imp_conn;
1175
1176                 imp_conn = list_first_entry(&imp->imp_conn_list,
1177                                             struct obd_import_conn, oic_item);
1178                 list_del_init(&imp_conn->oic_item);
1179                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1180                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1181         }
1182
1183         LASSERT(imp->imp_sec == NULL);
1184         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1185                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1186         class_decref(imp->imp_obd, "import", imp);
1187         OBD_FREE_PTR(imp);
1188         EXIT;
1189 }
1190
1191 struct obd_import *class_import_get(struct obd_import *import)
1192 {
1193         refcount_inc(&import->imp_refcount);
1194         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1195                refcount_read(&import->imp_refcount),
1196                import->imp_obd->obd_name);
1197         return import;
1198 }
1199 EXPORT_SYMBOL(class_import_get);
1200
1201 void class_import_put(struct obd_import *imp)
1202 {
1203         ENTRY;
1204
1205         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1206
1207         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1208                refcount_read(&imp->imp_refcount) - 1,
1209                imp->imp_obd->obd_name);
1210
1211         if (refcount_dec_and_test(&imp->imp_refcount)) {
1212                 CDEBUG(D_INFO, "final put import %p\n", imp);
1213                 obd_zombie_import_add(imp);
1214         }
1215
1216         EXIT;
1217 }
1218 EXPORT_SYMBOL(class_import_put);
1219
1220 static void init_imp_at(struct imp_at *at) {
1221         int i;
1222         at_init(&at->iat_net_latency, 0, 0);
1223         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1224                 /* max service estimates are tracked on the server side, so
1225                    don't use the AT history here, just use the last reported
1226                    val. (But keep hist for proc histogram, worst_ever) */
1227                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1228                         AT_FLG_NOHIST);
1229         }
1230 }
1231
1232 static void obd_zombie_imp_cull(struct work_struct *ws)
1233 {
1234         struct obd_import *import;
1235
1236         import = container_of(ws, struct obd_import, imp_zombie_work);
1237         obd_zombie_import_free(import);
1238 }
1239
1240 struct obd_import *class_new_import(struct obd_device *obd)
1241 {
1242         struct obd_import *imp;
1243         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1244
1245         OBD_ALLOC(imp, sizeof(*imp));
1246         if (imp == NULL)
1247                 return NULL;
1248
1249         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1250         INIT_LIST_HEAD(&imp->imp_replay_list);
1251         INIT_LIST_HEAD(&imp->imp_sending_list);
1252         INIT_LIST_HEAD(&imp->imp_delayed_list);
1253         INIT_LIST_HEAD(&imp->imp_committed_list);
1254         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1255         imp->imp_known_replied_xid = 0;
1256         imp->imp_replay_cursor = &imp->imp_committed_list;
1257         spin_lock_init(&imp->imp_lock);
1258         imp->imp_last_success_conn = 0;
1259         imp->imp_state = LUSTRE_IMP_NEW;
1260         imp->imp_obd = class_incref(obd, "import", imp);
1261         rwlock_init(&imp->imp_sec_lock);
1262         init_waitqueue_head(&imp->imp_recovery_waitq);
1263         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1264
1265         if (curr_pid_ns && curr_pid_ns->child_reaper)
1266                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1267         else
1268                 imp->imp_sec_refpid = 1;
1269
1270         refcount_set(&imp->imp_refcount, 2);
1271         atomic_set(&imp->imp_unregistering, 0);
1272         atomic_set(&imp->imp_reqs, 0);
1273         atomic_set(&imp->imp_inflight, 0);
1274         atomic_set(&imp->imp_replay_inflight, 0);
1275         init_waitqueue_head(&imp->imp_replay_waitq);
1276         atomic_set(&imp->imp_inval_count, 0);
1277         INIT_LIST_HEAD(&imp->imp_conn_list);
1278         init_imp_at(&imp->imp_at);
1279
1280         /* the default magic is V2, will be used in connect RPC, and
1281          * then adjusted according to the flags in request/reply. */
1282         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1283
1284         return imp;
1285 }
1286 EXPORT_SYMBOL(class_new_import);
1287
1288 void class_destroy_import(struct obd_import *import)
1289 {
1290         LASSERT(import != NULL);
1291         LASSERT(import != LP_POISON);
1292
1293         spin_lock(&import->imp_lock);
1294         import->imp_generation++;
1295         spin_unlock(&import->imp_lock);
1296         class_import_put(import);
1297 }
1298 EXPORT_SYMBOL(class_destroy_import);
1299
1300 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1301
1302 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1303 {
1304         spin_lock(&exp->exp_locks_list_guard);
1305
1306         LASSERT(lock->l_exp_refs_nr >= 0);
1307
1308         if (lock->l_exp_refs_target != NULL &&
1309             lock->l_exp_refs_target != exp) {
1310                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1311                               exp, lock, lock->l_exp_refs_target);
1312         }
1313         if ((lock->l_exp_refs_nr ++) == 0) {
1314                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1315                 lock->l_exp_refs_target = exp;
1316         }
1317         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1318                lock, exp, lock->l_exp_refs_nr);
1319         spin_unlock(&exp->exp_locks_list_guard);
1320 }
1321 EXPORT_SYMBOL(__class_export_add_lock_ref);
1322
1323 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1324 {
1325         spin_lock(&exp->exp_locks_list_guard);
1326         LASSERT(lock->l_exp_refs_nr > 0);
1327         if (lock->l_exp_refs_target != exp) {
1328                 LCONSOLE_WARN("lock %p, "
1329                               "mismatching export pointers: %p, %p\n",
1330                               lock, lock->l_exp_refs_target, exp);
1331         }
1332         if (-- lock->l_exp_refs_nr == 0) {
1333                 list_del_init(&lock->l_exp_refs_link);
1334                 lock->l_exp_refs_target = NULL;
1335         }
1336         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1337                lock, exp, lock->l_exp_refs_nr);
1338         spin_unlock(&exp->exp_locks_list_guard);
1339 }
1340 EXPORT_SYMBOL(__class_export_del_lock_ref);
1341 #endif
1342
1343 /* A connection defines an export context in which preallocation can
1344    be managed. This releases the export pointer reference, and returns
1345    the export handle, so the export refcount is 1 when this function
1346    returns. */
1347 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1348                   struct obd_uuid *cluuid)
1349 {
1350         struct obd_export *export;
1351         LASSERT(conn != NULL);
1352         LASSERT(obd != NULL);
1353         LASSERT(cluuid != NULL);
1354         ENTRY;
1355
1356         export = class_new_export(obd, cluuid);
1357         if (IS_ERR(export))
1358                 RETURN(PTR_ERR(export));
1359
1360         conn->cookie = export->exp_handle.h_cookie;
1361         class_export_put(export);
1362
1363         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1364                cluuid->uuid, conn->cookie);
1365         RETURN(0);
1366 }
1367 EXPORT_SYMBOL(class_connect);
1368
1369 /* if export is involved in recovery then clean up related things */
1370 static void class_export_recovery_cleanup(struct obd_export *exp)
1371 {
1372         struct obd_device *obd = exp->exp_obd;
1373
1374         spin_lock(&obd->obd_recovery_task_lock);
1375         if (obd->obd_recovering) {
1376                 if (exp->exp_in_recovery) {
1377                         spin_lock(&exp->exp_lock);
1378                         exp->exp_in_recovery = 0;
1379                         spin_unlock(&exp->exp_lock);
1380                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1381                         atomic_dec(&obd->obd_connected_clients);
1382                 }
1383
1384                 /* if called during recovery then should update
1385                  * obd_stale_clients counter,
1386                  * lightweight exports are not counted */
1387                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1388                         exp->exp_obd->obd_stale_clients++;
1389         }
1390         spin_unlock(&obd->obd_recovery_task_lock);
1391
1392         spin_lock(&exp->exp_lock);
1393         /** Cleanup req replay fields */
1394         if (exp->exp_req_replay_needed) {
1395                 exp->exp_req_replay_needed = 0;
1396
1397                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1398                 atomic_dec(&obd->obd_req_replay_clients);
1399         }
1400
1401         /** Cleanup lock replay data */
1402         if (exp->exp_lock_replay_needed) {
1403                 exp->exp_lock_replay_needed = 0;
1404
1405                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1406                 atomic_dec(&obd->obd_lock_replay_clients);
1407         }
1408         spin_unlock(&exp->exp_lock);
1409 }
1410
1411 /* This function removes 1-3 references from the export:
1412  * 1 - for export pointer passed
1413  * and if disconnect really need
1414  * 2 - removing from hash
1415  * 3 - in client_unlink_export
1416  * The export pointer passed to this function can destroyed */
1417 int class_disconnect(struct obd_export *export)
1418 {
1419         int already_disconnected;
1420         ENTRY;
1421
1422         if (export == NULL) {
1423                 CWARN("attempting to free NULL export %p\n", export);
1424                 RETURN(-EINVAL);
1425         }
1426
1427         spin_lock(&export->exp_lock);
1428         already_disconnected = export->exp_disconnected;
1429         export->exp_disconnected = 1;
1430 #ifdef HAVE_SERVER_SUPPORT
1431         /*  We hold references of export for uuid hash
1432          *  and nid_hash and export link at least. So
1433          *  it is safe to call rh*table_remove_fast in
1434          *  there.
1435          */
1436         obd_nid_del(export->exp_obd, export);
1437 #endif /* HAVE_SERVER_SUPPORT */
1438         spin_unlock(&export->exp_lock);
1439
1440         /* class_cleanup(), abort_recovery(), and class_fail_export()
1441          * all end up in here, and if any of them race we shouldn't
1442          * call extra class_export_puts(). */
1443         if (already_disconnected)
1444                 GOTO(no_disconn, already_disconnected);
1445
1446         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1447                export->exp_handle.h_cookie);
1448
1449         class_export_recovery_cleanup(export);
1450         class_unlink_export(export);
1451 no_disconn:
1452         class_export_put(export);
1453         RETURN(0);
1454 }
1455 EXPORT_SYMBOL(class_disconnect);
1456
1457 /* Return non-zero for a fully connected export */
1458 int class_connected_export(struct obd_export *exp)
1459 {
1460         int connected = 0;
1461
1462         if (exp) {
1463                 spin_lock(&exp->exp_lock);
1464                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1465                 spin_unlock(&exp->exp_lock);
1466         }
1467         return connected;
1468 }
1469 EXPORT_SYMBOL(class_connected_export);
1470
1471 static void class_disconnect_export_list(struct list_head *list,
1472                                          enum obd_option flags)
1473 {
1474         int rc;
1475         struct obd_export *exp;
1476         ENTRY;
1477
1478         /* It's possible that an export may disconnect itself, but
1479          * nothing else will be added to this list. */
1480         while (!list_empty(list)) {
1481                 exp = list_first_entry(list, struct obd_export,
1482                                        exp_obd_chain);
1483                 /* need for safe call CDEBUG after obd_disconnect */
1484                 class_export_get(exp);
1485
1486                 spin_lock(&exp->exp_lock);
1487                 exp->exp_flags = flags;
1488                 spin_unlock(&exp->exp_lock);
1489
1490                 if (obd_uuid_equals(&exp->exp_client_uuid,
1491                                     &exp->exp_obd->obd_uuid)) {
1492                         CDEBUG(D_HA,
1493                                "exp %p export uuid == obd uuid, don't discon\n",
1494                                exp);
1495                         /* Need to delete this now so we don't end up pointing
1496                          * to work_list later when this export is cleaned up. */
1497                         list_del_init(&exp->exp_obd_chain);
1498                         class_export_put(exp);
1499                         continue;
1500                 }
1501
1502                 class_export_get(exp);
1503                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1504                        "last request at %lld\n",
1505                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1506                        exp, exp->exp_last_request_time);
1507                 /* release one export reference anyway */
1508                 rc = obd_disconnect(exp);
1509
1510                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1511                        obd_export_nid2str(exp), exp, rc);
1512                 class_export_put(exp);
1513         }
1514         EXIT;
1515 }
1516
1517 void class_disconnect_exports(struct obd_device *obd)
1518 {
1519         LIST_HEAD(work_list);
1520         ENTRY;
1521
1522         /* Move all of the exports from obd_exports to a work list, en masse. */
1523         spin_lock(&obd->obd_dev_lock);
1524         list_splice_init(&obd->obd_exports, &work_list);
1525         list_splice_init(&obd->obd_delayed_exports, &work_list);
1526         spin_unlock(&obd->obd_dev_lock);
1527
1528         if (!list_empty(&work_list)) {
1529                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1530                        "disconnecting them\n", obd->obd_minor, obd);
1531                 class_disconnect_export_list(&work_list,
1532                                              exp_flags_from_obd(obd));
1533         } else
1534                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1535                        obd->obd_minor, obd);
1536         EXIT;
1537 }
1538 EXPORT_SYMBOL(class_disconnect_exports);
1539
1540 /* Remove exports that have not completed recovery.
1541  */
1542 void class_disconnect_stale_exports(struct obd_device *obd,
1543                                     int (*test_export)(struct obd_export *))
1544 {
1545         LIST_HEAD(work_list);
1546         struct obd_export *exp, *n;
1547         int evicted = 0;
1548         ENTRY;
1549
1550         spin_lock(&obd->obd_dev_lock);
1551         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1552                                  exp_obd_chain) {
1553                 /* don't count self-export as client */
1554                 if (obd_uuid_equals(&exp->exp_client_uuid,
1555                                     &exp->exp_obd->obd_uuid))
1556                         continue;
1557
1558                 /* don't evict clients which have no slot in last_rcvd
1559                  * (e.g. lightweight connection) */
1560                 if (exp->exp_target_data.ted_lr_idx == -1)
1561                         continue;
1562
1563                 spin_lock(&exp->exp_lock);
1564                 if (exp->exp_failed || test_export(exp)) {
1565                         spin_unlock(&exp->exp_lock);
1566                         continue;
1567                 }
1568                 exp->exp_failed = 1;
1569                 spin_unlock(&exp->exp_lock);
1570
1571                 list_move(&exp->exp_obd_chain, &work_list);
1572                 evicted++;
1573                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1574                        obd->obd_name, exp->exp_client_uuid.uuid,
1575                        obd_export_nid2str(exp));
1576                 print_export_data(exp, "EVICTING", 0, D_HA);
1577         }
1578         spin_unlock(&obd->obd_dev_lock);
1579
1580         if (evicted)
1581                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1582                               obd->obd_name, evicted);
1583
1584         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1585                                                  OBD_OPT_ABORT_RECOV);
1586         EXIT;
1587 }
1588 EXPORT_SYMBOL(class_disconnect_stale_exports);
1589
1590 void class_fail_export(struct obd_export *exp)
1591 {
1592         int rc, already_failed;
1593
1594         spin_lock(&exp->exp_lock);
1595         already_failed = exp->exp_failed;
1596         exp->exp_failed = 1;
1597         spin_unlock(&exp->exp_lock);
1598
1599         if (already_failed) {
1600                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1601                        exp, exp->exp_client_uuid.uuid);
1602                 return;
1603         }
1604
1605         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1606                exp, exp->exp_client_uuid.uuid);
1607
1608         if (obd_dump_on_timeout)
1609                 libcfs_debug_dumplog();
1610
1611         /* need for safe call CDEBUG after obd_disconnect */
1612         class_export_get(exp);
1613
1614         /* Most callers into obd_disconnect are removing their own reference
1615          * (request, for example) in addition to the one from the hash table.
1616          * We don't have such a reference here, so make one. */
1617         class_export_get(exp);
1618         rc = obd_disconnect(exp);
1619         if (rc)
1620                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1621         else
1622                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1623                        exp, exp->exp_client_uuid.uuid);
1624         class_export_put(exp);
1625 }
1626 EXPORT_SYMBOL(class_fail_export);
1627
1628 #ifdef HAVE_SERVER_SUPPORT
1629 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1630 {
1631         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1632         struct obd_export *doomed_exp;
1633         struct rhashtable_iter iter;
1634         int exports_evicted = 0;
1635
1636         spin_lock(&obd->obd_dev_lock);
1637         /* umount has run already, so evict thread should leave
1638          * its task to umount thread now */
1639         if (obd->obd_stopping) {
1640                 spin_unlock(&obd->obd_dev_lock);
1641                 return exports_evicted;
1642         }
1643         spin_unlock(&obd->obd_dev_lock);
1644
1645         rhltable_walk_enter(&obd->obd_nid_hash, &iter);
1646         rhashtable_walk_start(&iter);
1647         while ((doomed_exp = rhashtable_walk_next(&iter)) != NULL) {
1648                 if (IS_ERR(doomed_exp))
1649                         continue;
1650
1651                 if (!doomed_exp->exp_connection ||
1652                     doomed_exp->exp_connection->c_peer.nid != nid_key)
1653                         continue;
1654
1655                 if (!refcount_inc_not_zero(&doomed_exp->exp_handle.h_ref))
1656                         continue;
1657
1658                 rhashtable_walk_stop(&iter);
1659
1660                 LASSERTF(doomed_exp != obd->obd_self_export,
1661                          "self-export is hashed by NID?\n");
1662
1663                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1664                               obd->obd_name,
1665                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1666                               obd_export_nid2str(doomed_exp));
1667
1668                 class_fail_export(doomed_exp);
1669                 class_export_put(doomed_exp);
1670                 exports_evicted++;
1671
1672                 rhashtable_walk_start(&iter);
1673         }
1674         rhashtable_walk_stop(&iter);
1675         rhashtable_walk_exit(&iter);
1676
1677         if (!exports_evicted)
1678                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1679                        obd->obd_name, nid);
1680         return exports_evicted;
1681 }
1682 EXPORT_SYMBOL(obd_export_evict_by_nid);
1683
1684 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1685 {
1686         struct obd_export *doomed_exp = NULL;
1687         struct obd_uuid doomed_uuid;
1688         int exports_evicted = 0;
1689
1690         spin_lock(&obd->obd_dev_lock);
1691         if (obd->obd_stopping) {
1692                 spin_unlock(&obd->obd_dev_lock);
1693                 return exports_evicted;
1694         }
1695         spin_unlock(&obd->obd_dev_lock);
1696
1697         obd_str2uuid(&doomed_uuid, uuid);
1698         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1699                 CERROR("%s: can't evict myself\n", obd->obd_name);
1700                 return exports_evicted;
1701         }
1702
1703         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1704         if (doomed_exp == NULL) {
1705                 CERROR("%s: can't disconnect %s: no exports found\n",
1706                        obd->obd_name, uuid);
1707         } else {
1708                 CWARN("%s: evicting %s at adminstrative request\n",
1709                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1710                 class_fail_export(doomed_exp);
1711                 class_export_put(doomed_exp);
1712                 obd_uuid_del(obd, doomed_exp);
1713                 exports_evicted++;
1714         }
1715
1716         return exports_evicted;
1717 }
1718 #endif /* HAVE_SERVER_SUPPORT */
1719
1720 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1721 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1722 EXPORT_SYMBOL(class_export_dump_hook);
1723 #endif
1724
1725 static void print_export_data(struct obd_export *exp, const char *status,
1726                               int locks, int debug_level)
1727 {
1728         struct ptlrpc_reply_state *rs;
1729         struct ptlrpc_reply_state *first_reply = NULL;
1730         int nreplies = 0;
1731
1732         spin_lock(&exp->exp_lock);
1733         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1734                             rs_exp_list) {
1735                 if (nreplies == 0)
1736                         first_reply = rs;
1737                 nreplies++;
1738         }
1739         spin_unlock(&exp->exp_lock);
1740
1741         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1742                "%p %s %llu stale:%d\n",
1743                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1744                obd_export_nid2str(exp),
1745                refcount_read(&exp->exp_handle.h_ref),
1746                atomic_read(&exp->exp_rpc_count),
1747                atomic_read(&exp->exp_cb_count),
1748                atomic_read(&exp->exp_locks_count),
1749                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1750                nreplies, first_reply, nreplies > 3 ? "..." : "",
1751                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1752 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1753         if (locks && class_export_dump_hook != NULL)
1754                 class_export_dump_hook(exp);
1755 #endif
1756 }
1757
1758 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1759 {
1760         struct obd_export *exp;
1761
1762         spin_lock(&obd->obd_dev_lock);
1763         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1764                 print_export_data(exp, "ACTIVE", locks, debug_level);
1765         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1766                 print_export_data(exp, "UNLINKED", locks, debug_level);
1767         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1768                 print_export_data(exp, "DELAYED", locks, debug_level);
1769         spin_unlock(&obd->obd_dev_lock);
1770 }
1771
1772 void obd_exports_barrier(struct obd_device *obd)
1773 {
1774         int waited = 2;
1775         LASSERT(list_empty(&obd->obd_exports));
1776         spin_lock(&obd->obd_dev_lock);
1777         while (!list_empty(&obd->obd_unlinked_exports)) {
1778                 spin_unlock(&obd->obd_dev_lock);
1779                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1780                 if (waited > 5 && is_power_of_2(waited)) {
1781                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1782                                       "more than %d seconds. "
1783                                       "The obd refcount = %d. Is it stuck?\n",
1784                                       obd->obd_name, waited,
1785                                       atomic_read(&obd->obd_refcount));
1786                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1787                 }
1788                 waited *= 2;
1789                 spin_lock(&obd->obd_dev_lock);
1790         }
1791         spin_unlock(&obd->obd_dev_lock);
1792 }
1793 EXPORT_SYMBOL(obd_exports_barrier);
1794
1795 /**
1796  * Add export to the obd_zombe thread and notify it.
1797  */
1798 static void obd_zombie_export_add(struct obd_export *exp) {
1799         atomic_dec(&obd_stale_export_num);
1800         spin_lock(&exp->exp_obd->obd_dev_lock);
1801         LASSERT(!list_empty(&exp->exp_obd_chain));
1802         list_del_init(&exp->exp_obd_chain);
1803         spin_unlock(&exp->exp_obd->obd_dev_lock);
1804
1805         queue_work(zombie_wq, &exp->exp_zombie_work);
1806 }
1807
1808 /**
1809  * Add import to the obd_zombe thread and notify it.
1810  */
1811 static void obd_zombie_import_add(struct obd_import *imp) {
1812         LASSERT(imp->imp_sec == NULL);
1813
1814         queue_work(zombie_wq, &imp->imp_zombie_work);
1815 }
1816
1817 /**
1818  * wait when obd_zombie import/export queues become empty
1819  */
1820 void obd_zombie_barrier(void)
1821 {
1822         flush_workqueue(zombie_wq);
1823 }
1824 EXPORT_SYMBOL(obd_zombie_barrier);
1825
1826
1827 struct obd_export *obd_stale_export_get(void)
1828 {
1829         struct obd_export *exp = NULL;
1830         ENTRY;
1831
1832         spin_lock(&obd_stale_export_lock);
1833         if (!list_empty(&obd_stale_exports)) {
1834                 exp = list_first_entry(&obd_stale_exports,
1835                                        struct obd_export, exp_stale_list);
1836                 list_del_init(&exp->exp_stale_list);
1837         }
1838         spin_unlock(&obd_stale_export_lock);
1839
1840         if (exp) {
1841                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1842                        atomic_read(&obd_stale_export_num));
1843         }
1844         RETURN(exp);
1845 }
1846 EXPORT_SYMBOL(obd_stale_export_get);
1847
1848 void obd_stale_export_put(struct obd_export *exp)
1849 {
1850         ENTRY;
1851
1852         LASSERT(list_empty(&exp->exp_stale_list));
1853         if (exp->exp_lock_hash &&
1854             atomic_read(&exp->exp_lock_hash->hs_count)) {
1855                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1856                        atomic_read(&obd_stale_export_num));
1857
1858                 spin_lock_bh(&exp->exp_bl_list_lock);
1859                 spin_lock(&obd_stale_export_lock);
1860                 /* Add to the tail if there is no blocked locks,
1861                  * to the head otherwise. */
1862                 if (list_empty(&exp->exp_bl_list))
1863                         list_add_tail(&exp->exp_stale_list,
1864                                       &obd_stale_exports);
1865                 else
1866                         list_add(&exp->exp_stale_list,
1867                                  &obd_stale_exports);
1868
1869                 spin_unlock(&obd_stale_export_lock);
1870                 spin_unlock_bh(&exp->exp_bl_list_lock);
1871         } else {
1872                 class_export_put(exp);
1873         }
1874         EXIT;
1875 }
1876 EXPORT_SYMBOL(obd_stale_export_put);
1877
1878 /**
1879  * Adjust the position of the export in the stale list,
1880  * i.e. move to the head of the list if is needed.
1881  **/
1882 void obd_stale_export_adjust(struct obd_export *exp)
1883 {
1884         LASSERT(exp != NULL);
1885         spin_lock_bh(&exp->exp_bl_list_lock);
1886         spin_lock(&obd_stale_export_lock);
1887
1888         if (!list_empty(&exp->exp_stale_list) &&
1889             !list_empty(&exp->exp_bl_list))
1890                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1891
1892         spin_unlock(&obd_stale_export_lock);
1893         spin_unlock_bh(&exp->exp_bl_list_lock);
1894 }
1895 EXPORT_SYMBOL(obd_stale_export_adjust);
1896
1897 /**
1898  * start destroy zombie import/export thread
1899  */
1900 int obd_zombie_impexp_init(void)
1901 {
1902         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1903                                            0, CFS_CPT_ANY,
1904                                            cfs_cpt_number(cfs_cpt_tab));
1905
1906         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1907 }
1908
1909 /**
1910  * stop destroy zombie import/export thread
1911  */
1912 void obd_zombie_impexp_stop(void)
1913 {
1914         destroy_workqueue(zombie_wq);
1915         LASSERT(list_empty(&obd_stale_exports));
1916 }
1917
1918 /***** Kernel-userspace comm helpers *******/
1919
1920 /* Get length of entire message, including header */
1921 int kuc_len(int payload_len)
1922 {
1923         return sizeof(struct kuc_hdr) + payload_len;
1924 }
1925 EXPORT_SYMBOL(kuc_len);
1926
1927 /* Get a pointer to kuc header, given a ptr to the payload
1928  * @param p Pointer to payload area
1929  * @returns Pointer to kuc header
1930  */
1931 struct kuc_hdr * kuc_ptr(void *p)
1932 {
1933         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1934         LASSERT(lh->kuc_magic == KUC_MAGIC);
1935         return lh;
1936 }
1937 EXPORT_SYMBOL(kuc_ptr);
1938
1939 /* Alloc space for a message, and fill in header
1940  * @return Pointer to payload area
1941  */
1942 void *kuc_alloc(int payload_len, int transport, int type)
1943 {
1944         struct kuc_hdr *lh;
1945         int len = kuc_len(payload_len);
1946
1947         OBD_ALLOC(lh, len);
1948         if (lh == NULL)
1949                 return ERR_PTR(-ENOMEM);
1950
1951         lh->kuc_magic = KUC_MAGIC;
1952         lh->kuc_transport = transport;
1953         lh->kuc_msgtype = type;
1954         lh->kuc_msglen = len;
1955
1956         return (void *)(lh + 1);
1957 }
1958 EXPORT_SYMBOL(kuc_alloc);
1959
1960 /* Takes pointer to payload area */
1961 void kuc_free(void *p, int payload_len)
1962 {
1963         struct kuc_hdr *lh = kuc_ptr(p);
1964         OBD_FREE(lh, kuc_len(payload_len));
1965 }
1966 EXPORT_SYMBOL(kuc_free);
1967
1968 struct obd_request_slot_waiter {
1969         struct list_head        orsw_entry;
1970         wait_queue_head_t       orsw_waitq;
1971         bool                    orsw_signaled;
1972 };
1973
1974 static bool obd_request_slot_avail(struct client_obd *cli,
1975                                    struct obd_request_slot_waiter *orsw)
1976 {
1977         bool avail;
1978
1979         spin_lock(&cli->cl_loi_list_lock);
1980         avail = !!list_empty(&orsw->orsw_entry);
1981         spin_unlock(&cli->cl_loi_list_lock);
1982
1983         return avail;
1984 };
1985
1986 /*
1987  * For network flow control, the RPC sponsor needs to acquire a credit
1988  * before sending the RPC. The credits count for a connection is defined
1989  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1990  * the subsequent RPC sponsors need to wait until others released their
1991  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1992  */
1993 int obd_get_request_slot(struct client_obd *cli)
1994 {
1995         struct obd_request_slot_waiter   orsw;
1996         int                              rc;
1997
1998         spin_lock(&cli->cl_loi_list_lock);
1999         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2000                 cli->cl_rpcs_in_flight++;
2001                 spin_unlock(&cli->cl_loi_list_lock);
2002                 return 0;
2003         }
2004
2005         init_waitqueue_head(&orsw.orsw_waitq);
2006         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2007         orsw.orsw_signaled = false;
2008         spin_unlock(&cli->cl_loi_list_lock);
2009
2010         rc = l_wait_event_abortable(orsw.orsw_waitq,
2011                                     obd_request_slot_avail(cli, &orsw) ||
2012                                     orsw.orsw_signaled);
2013
2014         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2015          * freed but other (such as obd_put_request_slot) is using it. */
2016         spin_lock(&cli->cl_loi_list_lock);
2017         if (rc != 0) {
2018                 if (!orsw.orsw_signaled) {
2019                         if (list_empty(&orsw.orsw_entry))
2020                                 cli->cl_rpcs_in_flight--;
2021                         else
2022                                 list_del(&orsw.orsw_entry);
2023                 }
2024                 rc = -EINTR;
2025         }
2026
2027         if (orsw.orsw_signaled) {
2028                 LASSERT(list_empty(&orsw.orsw_entry));
2029
2030                 rc = -EINTR;
2031         }
2032         spin_unlock(&cli->cl_loi_list_lock);
2033
2034         return rc;
2035 }
2036 EXPORT_SYMBOL(obd_get_request_slot);
2037
2038 void obd_put_request_slot(struct client_obd *cli)
2039 {
2040         struct obd_request_slot_waiter *orsw;
2041
2042         spin_lock(&cli->cl_loi_list_lock);
2043         cli->cl_rpcs_in_flight--;
2044
2045         /* If there is free slot, wakeup the first waiter. */
2046         if (!list_empty(&cli->cl_flight_waiters) &&
2047             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2048                 orsw = list_first_entry(&cli->cl_flight_waiters,
2049                                         struct obd_request_slot_waiter,
2050                                         orsw_entry);
2051                 list_del_init(&orsw->orsw_entry);
2052                 cli->cl_rpcs_in_flight++;
2053                 wake_up(&orsw->orsw_waitq);
2054         }
2055         spin_unlock(&cli->cl_loi_list_lock);
2056 }
2057 EXPORT_SYMBOL(obd_put_request_slot);
2058
2059 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2060 {
2061         return cli->cl_max_rpcs_in_flight;
2062 }
2063 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2064
2065 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2066 {
2067         struct obd_request_slot_waiter *orsw;
2068         __u32                           old;
2069         int                             diff;
2070         int                             i;
2071         int                             rc;
2072
2073         if (max > OBD_MAX_RIF_MAX || max < 1)
2074                 return -ERANGE;
2075
2076         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2077                cli->cl_import->imp_obd->obd_name, max,
2078                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2079
2080         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2081                    LUSTRE_MDC_NAME) == 0) {
2082                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2083                  * strictly lower that max_rpcs_in_flight */
2084                 if (max < 2) {
2085                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2086                                cli->cl_import->imp_obd->obd_name);
2087                         return -ERANGE;
2088                 }
2089                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2090                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2091                         if (rc != 0)
2092                                 return rc;
2093                 }
2094         }
2095
2096         spin_lock(&cli->cl_loi_list_lock);
2097         old = cli->cl_max_rpcs_in_flight;
2098         cli->cl_max_rpcs_in_flight = max;
2099         client_adjust_max_dirty(cli);
2100
2101         diff = max - old;
2102
2103         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2104         for (i = 0; i < diff; i++) {
2105                 if (list_empty(&cli->cl_flight_waiters))
2106                         break;
2107
2108                 orsw = list_first_entry(&cli->cl_flight_waiters,
2109                                         struct obd_request_slot_waiter,
2110                                         orsw_entry);
2111                 list_del_init(&orsw->orsw_entry);
2112                 cli->cl_rpcs_in_flight++;
2113                 wake_up(&orsw->orsw_waitq);
2114         }
2115         spin_unlock(&cli->cl_loi_list_lock);
2116
2117         return 0;
2118 }
2119 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2120
2121 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2122 {
2123         return cli->cl_max_mod_rpcs_in_flight;
2124 }
2125 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2126
2127 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2128 {
2129         struct obd_connect_data *ocd;
2130         __u16 maxmodrpcs;
2131         __u16 prev;
2132
2133         if (max > OBD_MAX_RIF_MAX || max < 1)
2134                 return -ERANGE;
2135
2136         ocd = &cli->cl_import->imp_connect_data;
2137         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2138                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2139                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2140
2141         if (max == OBD_MAX_RIF_MAX)
2142                 max = OBD_MAX_RIF_MAX - 1;
2143
2144         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2145          * increase this value, also bump up max_rpcs_in_flight to match.
2146          */
2147         if (max >= cli->cl_max_rpcs_in_flight) {
2148                 CDEBUG(D_INFO,
2149                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2150                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2151                 obd_set_max_rpcs_in_flight(cli, max + 1);
2152         }
2153
2154         /* cannot exceed max modify RPCs in flight supported by the server,
2155          * but verify ocd_connect_flags is at least initialized first.  If
2156          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2157          */
2158         if (!ocd->ocd_connect_flags) {
2159                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2160         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2161                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2162                 if (maxmodrpcs == 0) { /* connection not finished yet */
2163                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2164                         CDEBUG(D_INFO,
2165                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2166                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2167                 }
2168         } else {
2169                 maxmodrpcs = 1;
2170         }
2171         if (max > maxmodrpcs) {
2172                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2173                        cli->cl_import->imp_obd->obd_name,
2174                        max, maxmodrpcs);
2175                 return -ERANGE;
2176         }
2177
2178         spin_lock(&cli->cl_mod_rpcs_lock);
2179
2180         prev = cli->cl_max_mod_rpcs_in_flight;
2181         cli->cl_max_mod_rpcs_in_flight = max;
2182
2183         /* wakeup waiters if limit has been increased */
2184         if (cli->cl_max_mod_rpcs_in_flight > prev)
2185                 wake_up(&cli->cl_mod_rpcs_waitq);
2186
2187         spin_unlock(&cli->cl_mod_rpcs_lock);
2188
2189         return 0;
2190 }
2191 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2192
2193 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2194                                struct seq_file *seq)
2195 {
2196         unsigned long mod_tot = 0, mod_cum;
2197         struct timespec64 now;
2198         int i;
2199
2200         ktime_get_real_ts64(&now);
2201
2202         spin_lock(&cli->cl_mod_rpcs_lock);
2203
2204         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2205                    (s64)now.tv_sec, now.tv_nsec);
2206         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2207                    cli->cl_mod_rpcs_in_flight);
2208
2209         seq_printf(seq, "\n\t\t\tmodify\n");
2210         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2211
2212         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2213
2214         mod_cum = 0;
2215         for (i = 0; i < OBD_HIST_MAX; i++) {
2216                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2217                 mod_cum += mod;
2218                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2219                            i, mod, pct(mod, mod_tot),
2220                            pct(mod_cum, mod_tot));
2221                 if (mod_cum == mod_tot)
2222                         break;
2223         }
2224
2225         spin_unlock(&cli->cl_mod_rpcs_lock);
2226
2227         return 0;
2228 }
2229 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2230
2231 /* The number of modify RPCs sent in parallel is limited
2232  * because the server has a finite number of slots per client to
2233  * store request result and ensure reply reconstruction when needed.
2234  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2235  * that takes into account server limit and cl_max_rpcs_in_flight
2236  * value.
2237  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2238  * one close request is allowed above the maximum.
2239  */
2240 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2241                                                  bool close_req)
2242 {
2243         bool avail;
2244
2245         /* A slot is available if
2246          * - number of modify RPCs in flight is less than the max
2247          * - it's a close RPC and no other close request is in flight
2248          */
2249         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2250                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2251
2252         return avail;
2253 }
2254
2255 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2256                                          bool close_req)
2257 {
2258         bool avail;
2259
2260         spin_lock(&cli->cl_mod_rpcs_lock);
2261         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2262         spin_unlock(&cli->cl_mod_rpcs_lock);
2263         return avail;
2264 }
2265
2266
2267 /* Get a modify RPC slot from the obd client @cli according
2268  * to the kind of operation @opc that is going to be sent
2269  * and the intent @it of the operation if it applies.
2270  * If the maximum number of modify RPCs in flight is reached
2271  * the thread is put to sleep.
2272  * Returns the tag to be set in the request message. Tag 0
2273  * is reserved for non-modifying requests.
2274  */
2275 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2276 {
2277         bool                    close_req = false;
2278         __u16                   i, max;
2279
2280         if (opc == MDS_CLOSE)
2281                 close_req = true;
2282
2283         do {
2284                 spin_lock(&cli->cl_mod_rpcs_lock);
2285                 max = cli->cl_max_mod_rpcs_in_flight;
2286                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2287                         /* there is a slot available */
2288                         cli->cl_mod_rpcs_in_flight++;
2289                         if (close_req)
2290                                 cli->cl_close_rpcs_in_flight++;
2291                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2292                                          cli->cl_mod_rpcs_in_flight);
2293                         /* find a free tag */
2294                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2295                                                 max + 1);
2296                         LASSERT(i < OBD_MAX_RIF_MAX);
2297                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2298                         spin_unlock(&cli->cl_mod_rpcs_lock);
2299                         /* tag 0 is reserved for non-modify RPCs */
2300
2301                         CDEBUG(D_RPCTRACE,
2302                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2303                                cli->cl_import->imp_obd->obd_name,
2304                                i + 1, opc, max);
2305
2306                         return i + 1;
2307                 }
2308                 spin_unlock(&cli->cl_mod_rpcs_lock);
2309
2310                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2311                        "opc %u, max %hu\n",
2312                        cli->cl_import->imp_obd->obd_name, opc, max);
2313
2314                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2315                                           obd_mod_rpc_slot_avail(cli,
2316                                                                  close_req));
2317         } while (true);
2318 }
2319 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2320
2321 /* Put a modify RPC slot from the obd client @cli according
2322  * to the kind of operation @opc that has been sent.
2323  */
2324 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2325 {
2326         bool                    close_req = false;
2327
2328         if (tag == 0)
2329                 return;
2330
2331         if (opc == MDS_CLOSE)
2332                 close_req = true;
2333
2334         spin_lock(&cli->cl_mod_rpcs_lock);
2335         cli->cl_mod_rpcs_in_flight--;
2336         if (close_req)
2337                 cli->cl_close_rpcs_in_flight--;
2338         /* release the tag in the bitmap */
2339         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2340         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2341         spin_unlock(&cli->cl_mod_rpcs_lock);
2342         wake_up(&cli->cl_mod_rpcs_waitq);
2343 }
2344 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2345