Whamcloud - gitweb
LU-5951 ptlrpc: track unreplied requests
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_disk.h>
48 #include <lustre_kernelcomm.h>
49
50 spinlock_t obd_types_lock;
51
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
56
57 static struct list_head obd_zombie_imports;
58 static struct list_head obd_zombie_exports;
59 static spinlock_t  obd_zombie_impexp_lock;
60
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 struct list_head obd_stale_exports;
68 spinlock_t       obd_stale_export_lock;
69 atomic_t         obd_stale_export_num;
70
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73
74 /*
75  * support functions: we could use inter-module communication, but this
76  * is more portable to other OS's
77  */
78 static struct obd_device *obd_device_alloc(void)
79 {
80         struct obd_device *obd;
81
82         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83         if (obd != NULL) {
84                 obd->obd_magic = OBD_DEVICE_MAGIC;
85         }
86         return obd;
87 }
88
89 static void obd_device_free(struct obd_device *obd)
90 {
91         LASSERT(obd != NULL);
92         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94         if (obd->obd_namespace != NULL) {
95                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96                        obd, obd->obd_namespace, obd->obd_force);
97                 LBUG();
98         }
99         lu_ref_fini(&obd->obd_reference);
100         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 }
102
103 struct obd_type *class_search_type(const char *name)
104 {
105         struct list_head *tmp;
106         struct obd_type *type;
107
108         spin_lock(&obd_types_lock);
109         list_for_each(tmp, &obd_types) {
110                 type = list_entry(tmp, struct obd_type, typ_chain);
111                 if (strcmp(type->typ_name, name) == 0) {
112                         spin_unlock(&obd_types_lock);
113                         return type;
114                 }
115         }
116         spin_unlock(&obd_types_lock);
117         return NULL;
118 }
119 EXPORT_SYMBOL(class_search_type);
120
121 struct obd_type *class_get_type(const char *name)
122 {
123         struct obd_type *type = class_search_type(name);
124
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
126         if (!type) {
127                 const char *modname = name;
128
129                 if (strcmp(modname, "obdfilter") == 0)
130                         modname = "ofd";
131
132                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133                         modname = LUSTRE_OSP_NAME;
134
135                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136                         modname = LUSTRE_MDT_NAME;
137
138                 if (!request_module("%s", modname)) {
139                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140                         type = class_search_type(name);
141                 } else {
142                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143                                            modname);
144                 }
145         }
146 #endif
147         if (type) {
148                 spin_lock(&type->obd_type_lock);
149                 type->typ_refcnt++;
150                 try_module_get(type->typ_dt_ops->o_owner);
151                 spin_unlock(&type->obd_type_lock);
152         }
153         return type;
154 }
155
156 void class_put_type(struct obd_type *type)
157 {
158         LASSERT(type);
159         spin_lock(&type->obd_type_lock);
160         type->typ_refcnt--;
161         module_put(type->typ_dt_ops->o_owner);
162         spin_unlock(&type->obd_type_lock);
163 }
164
165 #define CLASS_MAX_NAME 1024
166
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168                         bool enable_proc, struct lprocfs_vars *vars,
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef CONFIG_PROC_FS
205         if (enable_proc) {
206                 type->typ_procroot = lprocfs_register(type->typ_name,
207                                                       proc_lustre_root,
208                                                       vars, type);
209                 if (IS_ERR(type->typ_procroot)) {
210                         rc = PTR_ERR(type->typ_procroot);
211                         type->typ_procroot = NULL;
212                         GOTO(failed, rc);
213                 }
214         }
215 #endif
216         if (ldt != NULL) {
217                 type->typ_lu = ldt;
218                 rc = lu_device_type_init(ldt);
219                 if (rc != 0)
220                         GOTO (failed, rc);
221         }
222
223         spin_lock(&obd_types_lock);
224         list_add(&type->typ_chain, &obd_types);
225         spin_unlock(&obd_types_lock);
226
227         RETURN (0);
228
229 failed:
230         if (type->typ_name != NULL) {
231 #ifdef CONFIG_PROC_FS
232                 if (type->typ_procroot != NULL)
233                         remove_proc_subtree(type->typ_name, proc_lustre_root);
234 #endif
235                 OBD_FREE(type->typ_name, strlen(name) + 1);
236         }
237         if (type->typ_md_ops != NULL)
238                 OBD_FREE_PTR(type->typ_md_ops);
239         if (type->typ_dt_ops != NULL)
240                 OBD_FREE_PTR(type->typ_dt_ops);
241         OBD_FREE(type, sizeof(*type));
242         RETURN(rc);
243 }
244 EXPORT_SYMBOL(class_register_type);
245
246 int class_unregister_type(const char *name)
247 {
248         struct obd_type *type = class_search_type(name);
249         ENTRY;
250
251         if (!type) {
252                 CERROR("unknown obd type\n");
253                 RETURN(-EINVAL);
254         }
255
256         if (type->typ_refcnt) {
257                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258                 /* This is a bad situation, let's make the best of it */
259                 /* Remove ops, but leave the name for debugging */
260                 OBD_FREE_PTR(type->typ_dt_ops);
261                 OBD_FREE_PTR(type->typ_md_ops);
262                 RETURN(-EBUSY);
263         }
264
265         /* we do not use type->typ_procroot as for compatibility purposes
266          * other modules can share names (i.e. lod can use lov entry). so
267          * we can't reference pointer as it can get invalided when another
268          * module removes the entry */
269 #ifdef CONFIG_PROC_FS
270         if (type->typ_procroot != NULL)
271                 remove_proc_subtree(type->typ_name, proc_lustre_root);
272         if (type->typ_procsym != NULL)
273                 lprocfs_remove(&type->typ_procsym);
274 #endif
275         if (type->typ_lu)
276                 lu_device_type_fini(type->typ_lu);
277
278         spin_lock(&obd_types_lock);
279         list_del(&type->typ_chain);
280         spin_unlock(&obd_types_lock);
281         OBD_FREE(type->typ_name, strlen(name) + 1);
282         if (type->typ_dt_ops != NULL)
283                 OBD_FREE_PTR(type->typ_dt_ops);
284         if (type->typ_md_ops != NULL)
285                 OBD_FREE_PTR(type->typ_md_ops);
286         OBD_FREE(type, sizeof(*type));
287         RETURN(0);
288 } /* class_unregister_type */
289 EXPORT_SYMBOL(class_unregister_type);
290
291 /**
292  * Create a new obd device.
293  *
294  * Find an empty slot in ::obd_devs[], create a new obd device in it.
295  *
296  * \param[in] type_name obd device type string.
297  * \param[in] name      obd device name.
298  *
299  * \retval NULL if create fails, otherwise return the obd device
300  *         pointer created.
301  */
302 struct obd_device *class_newdev(const char *type_name, const char *name)
303 {
304         struct obd_device *result = NULL;
305         struct obd_device *newdev;
306         struct obd_type *type = NULL;
307         int i;
308         int new_obd_minor = 0;
309         ENTRY;
310
311         if (strlen(name) >= MAX_OBD_NAME) {
312                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
313                 RETURN(ERR_PTR(-EINVAL));
314         }
315
316         type = class_get_type(type_name);
317         if (type == NULL){
318                 CERROR("OBD: unknown type: %s\n", type_name);
319                 RETURN(ERR_PTR(-ENODEV));
320         }
321
322         newdev = obd_device_alloc();
323         if (newdev == NULL)
324                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
325
326         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
327
328         write_lock(&obd_dev_lock);
329         for (i = 0; i < class_devno_max(); i++) {
330                 struct obd_device *obd = class_num2obd(i);
331
332                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
333                         CERROR("Device %s already exists at %d, won't add\n",
334                                name, i);
335                         if (result) {
336                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
337                                          "%p obd_magic %08x != %08x\n", result,
338                                          result->obd_magic, OBD_DEVICE_MAGIC);
339                                 LASSERTF(result->obd_minor == new_obd_minor,
340                                          "%p obd_minor %d != %d\n", result,
341                                          result->obd_minor, new_obd_minor);
342
343                                 obd_devs[result->obd_minor] = NULL;
344                                 result->obd_name[0]='\0';
345                          }
346                         result = ERR_PTR(-EEXIST);
347                         break;
348                 }
349                 if (!result && !obd) {
350                         result = newdev;
351                         result->obd_minor = i;
352                         new_obd_minor = i;
353                         result->obd_type = type;
354                         strncpy(result->obd_name, name,
355                                 sizeof(result->obd_name) - 1);
356                         obd_devs[i] = result;
357                 }
358         }
359         write_unlock(&obd_dev_lock);
360
361         if (result == NULL && i >= class_devno_max()) {
362                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
363                        class_devno_max());
364                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365         }
366
367         if (IS_ERR(result))
368                 GOTO(out, result);
369
370         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
371                result->obd_name, result);
372
373         RETURN(result);
374 out:
375         obd_device_free(newdev);
376 out_type:
377         class_put_type(type);
378         return result;
379 }
380
381 void class_release_dev(struct obd_device *obd)
382 {
383         struct obd_type *obd_type = obd->obd_type;
384
385         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
386                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
387         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
388                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
389         LASSERT(obd_type != NULL);
390
391         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
392                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
393
394         write_lock(&obd_dev_lock);
395         obd_devs[obd->obd_minor] = NULL;
396         write_unlock(&obd_dev_lock);
397         obd_device_free(obd);
398
399         class_put_type(obd_type);
400 }
401
402 int class_name2dev(const char *name)
403 {
404         int i;
405
406         if (!name)
407                 return -1;
408
409         read_lock(&obd_dev_lock);
410         for (i = 0; i < class_devno_max(); i++) {
411                 struct obd_device *obd = class_num2obd(i);
412
413                 if (obd && strcmp(name, obd->obd_name) == 0) {
414                         /* Make sure we finished attaching before we give
415                            out any references */
416                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
417                         if (obd->obd_attached) {
418                                 read_unlock(&obd_dev_lock);
419                                 return i;
420                         }
421                         break;
422                 }
423         }
424         read_unlock(&obd_dev_lock);
425
426         return -1;
427 }
428
429 struct obd_device *class_name2obd(const char *name)
430 {
431         int dev = class_name2dev(name);
432
433         if (dev < 0 || dev > class_devno_max())
434                 return NULL;
435         return class_num2obd(dev);
436 }
437 EXPORT_SYMBOL(class_name2obd);
438
439 int class_uuid2dev(struct obd_uuid *uuid)
440 {
441         int i;
442
443         read_lock(&obd_dev_lock);
444         for (i = 0; i < class_devno_max(); i++) {
445                 struct obd_device *obd = class_num2obd(i);
446
447                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
448                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
449                         read_unlock(&obd_dev_lock);
450                         return i;
451                 }
452         }
453         read_unlock(&obd_dev_lock);
454
455         return -1;
456 }
457
458 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
459 {
460         int dev = class_uuid2dev(uuid);
461         if (dev < 0)
462                 return NULL;
463         return class_num2obd(dev);
464 }
465 EXPORT_SYMBOL(class_uuid2obd);
466
467 /**
468  * Get obd device from ::obd_devs[]
469  *
470  * \param num [in] array index
471  *
472  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
473  *         otherwise return the obd device there.
474  */
475 struct obd_device *class_num2obd(int num)
476 {
477         struct obd_device *obd = NULL;
478
479         if (num < class_devno_max()) {
480                 obd = obd_devs[num];
481                 if (obd == NULL)
482                         return NULL;
483
484                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
485                          "%p obd_magic %08x != %08x\n",
486                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
487                 LASSERTF(obd->obd_minor == num,
488                          "%p obd_minor %0d != %0d\n",
489                          obd, obd->obd_minor, num);
490         }
491
492         return obd;
493 }
494
495 /**
496  * Get obd devices count. Device in any
497  *    state are counted
498  * \retval obd device count
499  */
500 int get_devices_count(void)
501 {
502         int index, max_index = class_devno_max(), dev_count = 0;
503
504         read_lock(&obd_dev_lock);
505         for (index = 0; index <= max_index; index++) {
506                 struct obd_device *obd = class_num2obd(index);
507                 if (obd != NULL)
508                         dev_count++;
509         }
510         read_unlock(&obd_dev_lock);
511
512         return dev_count;
513 }
514 EXPORT_SYMBOL(get_devices_count);
515
516 void class_obd_list(void)
517 {
518         char *status;
519         int i;
520
521         read_lock(&obd_dev_lock);
522         for (i = 0; i < class_devno_max(); i++) {
523                 struct obd_device *obd = class_num2obd(i);
524
525                 if (obd == NULL)
526                         continue;
527                 if (obd->obd_stopping)
528                         status = "ST";
529                 else if (obd->obd_set_up)
530                         status = "UP";
531                 else if (obd->obd_attached)
532                         status = "AT";
533                 else
534                         status = "--";
535                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
536                          i, status, obd->obd_type->typ_name,
537                          obd->obd_name, obd->obd_uuid.uuid,
538                          atomic_read(&obd->obd_refcount));
539         }
540         read_unlock(&obd_dev_lock);
541         return;
542 }
543
544 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
545    specified, then only the client with that uuid is returned,
546    otherwise any client connected to the tgt is returned. */
547 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
548                                           const char * typ_name,
549                                           struct obd_uuid *grp_uuid)
550 {
551         int i;
552
553         read_lock(&obd_dev_lock);
554         for (i = 0; i < class_devno_max(); i++) {
555                 struct obd_device *obd = class_num2obd(i);
556
557                 if (obd == NULL)
558                         continue;
559                 if ((strncmp(obd->obd_type->typ_name, typ_name,
560                              strlen(typ_name)) == 0)) {
561                         if (obd_uuid_equals(tgt_uuid,
562                                             &obd->u.cli.cl_target_uuid) &&
563                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
564                                                          &obd->obd_uuid) : 1)) {
565                                 read_unlock(&obd_dev_lock);
566                                 return obd;
567                         }
568                 }
569         }
570         read_unlock(&obd_dev_lock);
571
572         return NULL;
573 }
574 EXPORT_SYMBOL(class_find_client_obd);
575
576 /* Iterate the obd_device list looking devices have grp_uuid. Start
577    searching at *next, and if a device is found, the next index to look
578    at is saved in *next. If next is NULL, then the first matching device
579    will always be returned. */
580 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 {
582         int i;
583
584         if (next == NULL)
585                 i = 0;
586         else if (*next >= 0 && *next < class_devno_max())
587                 i = *next;
588         else
589                 return NULL;
590
591         read_lock(&obd_dev_lock);
592         for (; i < class_devno_max(); i++) {
593                 struct obd_device *obd = class_num2obd(i);
594
595                 if (obd == NULL)
596                         continue;
597                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
598                         if (next != NULL)
599                                 *next = i+1;
600                         read_unlock(&obd_dev_lock);
601                         return obd;
602                 }
603         }
604         read_unlock(&obd_dev_lock);
605
606         return NULL;
607 }
608 EXPORT_SYMBOL(class_devices_in_group);
609
610 /**
611  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
612  * adjust sptlrpc settings accordingly.
613  */
614 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
615 {
616         struct obd_device  *obd;
617         const char         *type;
618         int                 i, rc = 0, rc2;
619
620         LASSERT(namelen > 0);
621
622         read_lock(&obd_dev_lock);
623         for (i = 0; i < class_devno_max(); i++) {
624                 obd = class_num2obd(i);
625
626                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
627                         continue;
628
629                 /* only notify mdc, osc, osp, lwp, mdt, ost
630                  * because only these have a -sptlrpc llog */
631                 type = obd->obd_type->typ_name;
632                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
633                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
634                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
635                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
636                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
637                     strcmp(type, LUSTRE_OST_NAME) != 0)
638                         continue;
639
640                 if (strncmp(obd->obd_name, fsname, namelen))
641                         continue;
642
643                 class_incref(obd, __FUNCTION__, obd);
644                 read_unlock(&obd_dev_lock);
645                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
646                                          sizeof(KEY_SPTLRPC_CONF),
647                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
648                 rc = rc ? rc : rc2;
649                 class_decref(obd, __FUNCTION__, obd);
650                 read_lock(&obd_dev_lock);
651         }
652         read_unlock(&obd_dev_lock);
653         return rc;
654 }
655 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
656
657 void obd_cleanup_caches(void)
658 {
659         ENTRY;
660         if (obd_device_cachep) {
661                 kmem_cache_destroy(obd_device_cachep);
662                 obd_device_cachep = NULL;
663         }
664         if (obdo_cachep) {
665                 kmem_cache_destroy(obdo_cachep);
666                 obdo_cachep = NULL;
667         }
668         if (import_cachep) {
669                 kmem_cache_destroy(import_cachep);
670                 import_cachep = NULL;
671         }
672
673         EXIT;
674 }
675
676 int obd_init_caches(void)
677 {
678         int rc;
679         ENTRY;
680
681         LASSERT(obd_device_cachep == NULL);
682         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
683                                               sizeof(struct obd_device),
684                                               0, 0, NULL);
685         if (!obd_device_cachep)
686                 GOTO(out, rc = -ENOMEM);
687
688         LASSERT(obdo_cachep == NULL);
689         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
690                                         0, 0, NULL);
691         if (!obdo_cachep)
692                 GOTO(out, rc = -ENOMEM);
693
694         LASSERT(import_cachep == NULL);
695         import_cachep = kmem_cache_create("ll_import_cache",
696                                           sizeof(struct obd_import),
697                                           0, 0, NULL);
698         if (!import_cachep)
699                 GOTO(out, rc = -ENOMEM);
700
701         RETURN(0);
702 out:
703         obd_cleanup_caches();
704         RETURN(rc);
705 }
706
707 /* map connection to client */
708 struct obd_export *class_conn2export(struct lustre_handle *conn)
709 {
710         struct obd_export *export;
711         ENTRY;
712
713         if (!conn) {
714                 CDEBUG(D_CACHE, "looking for null handle\n");
715                 RETURN(NULL);
716         }
717
718         if (conn->cookie == -1) {  /* this means assign a new connection */
719                 CDEBUG(D_CACHE, "want a new connection\n");
720                 RETURN(NULL);
721         }
722
723         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
724         export = class_handle2object(conn->cookie, NULL);
725         RETURN(export);
726 }
727 EXPORT_SYMBOL(class_conn2export);
728
729 struct obd_device *class_exp2obd(struct obd_export *exp)
730 {
731         if (exp)
732                 return exp->exp_obd;
733         return NULL;
734 }
735 EXPORT_SYMBOL(class_exp2obd);
736
737 struct obd_device *class_conn2obd(struct lustre_handle *conn)
738 {
739         struct obd_export *export;
740         export = class_conn2export(conn);
741         if (export) {
742                 struct obd_device *obd = export->exp_obd;
743                 class_export_put(export);
744                 return obd;
745         }
746         return NULL;
747 }
748
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
750 {
751         struct obd_device *obd = exp->exp_obd;
752         if (obd == NULL)
753                 return NULL;
754         return obd->u.cli.cl_import;
755 }
756 EXPORT_SYMBOL(class_exp2cliimp);
757
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
759 {
760         struct obd_device *obd = class_conn2obd(conn);
761         if (obd == NULL)
762                 return NULL;
763         return obd->u.cli.cl_import;
764 }
765
766 /* Export management functions */
767 static void class_export_destroy(struct obd_export *exp)
768 {
769         struct obd_device *obd = exp->exp_obd;
770         ENTRY;
771
772         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
773         LASSERT(obd != NULL);
774
775         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
776                exp->exp_client_uuid.uuid, obd->obd_name);
777
778         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
779         if (exp->exp_connection)
780                 ptlrpc_put_connection_superhack(exp->exp_connection);
781
782         LASSERT(list_empty(&exp->exp_outstanding_replies));
783         LASSERT(list_empty(&exp->exp_uncommitted_replies));
784         LASSERT(list_empty(&exp->exp_req_replay_queue));
785         LASSERT(list_empty(&exp->exp_hp_rpcs));
786         obd_destroy_export(exp);
787         class_decref(obd, "export", exp);
788
789         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790         EXIT;
791 }
792
793 static void export_handle_addref(void *export)
794 {
795         class_export_get(export);
796 }
797
798 static struct portals_handle_ops export_handle_ops = {
799         .hop_addref = export_handle_addref,
800         .hop_free   = NULL,
801 };
802
803 struct obd_export *class_export_get(struct obd_export *exp)
804 {
805         atomic_inc(&exp->exp_refcount);
806         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
807                atomic_read(&exp->exp_refcount));
808         return exp;
809 }
810 EXPORT_SYMBOL(class_export_get);
811
812 void class_export_put(struct obd_export *exp)
813 {
814         LASSERT(exp != NULL);
815         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
816         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
817                atomic_read(&exp->exp_refcount) - 1);
818
819         if (atomic_dec_and_test(&exp->exp_refcount)) {
820                 LASSERT(!list_empty(&exp->exp_obd_chain));
821                 LASSERT(list_empty(&exp->exp_stale_list));
822                 CDEBUG(D_IOCTL, "final put %p/%s\n",
823                        exp, exp->exp_client_uuid.uuid);
824
825                 /* release nid stat refererence */
826                 lprocfs_exp_cleanup(exp);
827
828                 obd_zombie_export_add(exp);
829         }
830 }
831 EXPORT_SYMBOL(class_export_put);
832
833 /* Creates a new export, adds it to the hash table, and returns a
834  * pointer to it. The refcount is 2: one for the hash reference, and
835  * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837                                     struct obd_uuid *cluuid)
838 {
839         struct obd_export *export;
840         struct cfs_hash *hash = NULL;
841         int rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(export);
845         if (!export)
846                 return ERR_PTR(-ENOMEM);
847
848         export->exp_conn_cnt = 0;
849         export->exp_lock_hash = NULL;
850         export->exp_flock_hash = NULL;
851         atomic_set(&export->exp_refcount, 2);
852         atomic_set(&export->exp_rpc_count, 0);
853         atomic_set(&export->exp_cb_count, 0);
854         atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856         INIT_LIST_HEAD(&export->exp_locks_list);
857         spin_lock_init(&export->exp_locks_list_guard);
858 #endif
859         atomic_set(&export->exp_replay_count, 0);
860         export->exp_obd = obd;
861         INIT_LIST_HEAD(&export->exp_outstanding_replies);
862         spin_lock_init(&export->exp_uncommitted_replies_lock);
863         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864         INIT_LIST_HEAD(&export->exp_req_replay_queue);
865         INIT_LIST_HEAD(&export->exp_handle.h_link);
866         INIT_LIST_HEAD(&export->exp_hp_rpcs);
867         INIT_LIST_HEAD(&export->exp_reg_rpcs);
868         class_handle_hash(&export->exp_handle, &export_handle_ops);
869         export->exp_last_request_time = cfs_time_current_sec();
870         spin_lock_init(&export->exp_lock);
871         spin_lock_init(&export->exp_rpc_lock);
872         INIT_HLIST_NODE(&export->exp_uuid_hash);
873         INIT_HLIST_NODE(&export->exp_nid_hash);
874         INIT_HLIST_NODE(&export->exp_gen_hash);
875         spin_lock_init(&export->exp_bl_list_lock);
876         INIT_LIST_HEAD(&export->exp_bl_list);
877         INIT_LIST_HEAD(&export->exp_stale_list);
878
879         export->exp_sp_peer = LUSTRE_SP_ANY;
880         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881         export->exp_client_uuid = *cluuid;
882         obd_init_export(export);
883
884         spin_lock(&obd->obd_dev_lock);
885         /* shouldn't happen, but might race */
886         if (obd->obd_stopping)
887                 GOTO(exit_unlock, rc = -ENODEV);
888
889         hash = cfs_hash_getref(obd->obd_uuid_hash);
890         if (hash == NULL)
891                 GOTO(exit_unlock, rc = -ENODEV);
892         spin_unlock(&obd->obd_dev_lock);
893
894         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896                 if (rc != 0) {
897                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898                                       obd->obd_name, cluuid->uuid, rc);
899                         GOTO(exit_err, rc = -EALREADY);
900                 }
901         }
902
903         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
904         spin_lock(&obd->obd_dev_lock);
905         if (obd->obd_stopping) {
906                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
907                 GOTO(exit_unlock, rc = -ENODEV);
908         }
909
910         class_incref(obd, "export", export);
911         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
912         list_add_tail(&export->exp_obd_chain_timed,
913                       &export->exp_obd->obd_exports_timed);
914         export->exp_obd->obd_num_exports++;
915         spin_unlock(&obd->obd_dev_lock);
916         cfs_hash_putref(hash);
917         RETURN(export);
918
919 exit_unlock:
920         spin_unlock(&obd->obd_dev_lock);
921 exit_err:
922         if (hash)
923                 cfs_hash_putref(hash);
924         class_handle_unhash(&export->exp_handle);
925         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
926         obd_destroy_export(export);
927         OBD_FREE_PTR(export);
928         return ERR_PTR(rc);
929 }
930 EXPORT_SYMBOL(class_new_export);
931
932 void class_unlink_export(struct obd_export *exp)
933 {
934         class_handle_unhash(&exp->exp_handle);
935
936         spin_lock(&exp->exp_obd->obd_dev_lock);
937         /* delete an uuid-export hashitem from hashtables */
938         if (!hlist_unhashed(&exp->exp_uuid_hash))
939                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
940                              &exp->exp_client_uuid,
941                              &exp->exp_uuid_hash);
942
943         if (!hlist_unhashed(&exp->exp_gen_hash)) {
944                 struct tg_export_data   *ted = &exp->exp_target_data;
945                 struct cfs_hash         *hash;
946
947                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
948                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
949                              &exp->exp_gen_hash);
950                 cfs_hash_putref(hash);
951         }
952
953         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
954         list_del_init(&exp->exp_obd_chain_timed);
955         exp->exp_obd->obd_num_exports--;
956         spin_unlock(&exp->exp_obd->obd_dev_lock);
957         atomic_inc(&obd_stale_export_num);
958
959         /* A reference is kept by obd_stale_exports list */
960         obd_stale_export_put(exp);
961 }
962 EXPORT_SYMBOL(class_unlink_export);
963
964 /* Import management functions */
965 static void class_import_destroy(struct obd_import *imp)
966 {
967         ENTRY;
968
969         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
970                 imp->imp_obd->obd_name);
971
972         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
973
974         ptlrpc_put_connection_superhack(imp->imp_connection);
975
976         while (!list_empty(&imp->imp_conn_list)) {
977                 struct obd_import_conn *imp_conn;
978
979                 imp_conn = list_entry(imp->imp_conn_list.next,
980                                       struct obd_import_conn, oic_item);
981                 list_del_init(&imp_conn->oic_item);
982                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
983                 OBD_FREE(imp_conn, sizeof(*imp_conn));
984         }
985
986         LASSERT(imp->imp_sec == NULL);
987         class_decref(imp->imp_obd, "import", imp);
988         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
989         EXIT;
990 }
991
992 static void import_handle_addref(void *import)
993 {
994         class_import_get(import);
995 }
996
997 static struct portals_handle_ops import_handle_ops = {
998         .hop_addref = import_handle_addref,
999         .hop_free   = NULL,
1000 };
1001
1002 struct obd_import *class_import_get(struct obd_import *import)
1003 {
1004         atomic_inc(&import->imp_refcount);
1005         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1006                atomic_read(&import->imp_refcount),
1007                import->imp_obd->obd_name);
1008         return import;
1009 }
1010 EXPORT_SYMBOL(class_import_get);
1011
1012 void class_import_put(struct obd_import *imp)
1013 {
1014         ENTRY;
1015
1016         LASSERT(list_empty(&imp->imp_zombie_chain));
1017         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1018
1019         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1020                atomic_read(&imp->imp_refcount) - 1,
1021                imp->imp_obd->obd_name);
1022
1023         if (atomic_dec_and_test(&imp->imp_refcount)) {
1024                 CDEBUG(D_INFO, "final put import %p\n", imp);
1025                 obd_zombie_import_add(imp);
1026         }
1027
1028         /* catch possible import put race */
1029         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1030         EXIT;
1031 }
1032 EXPORT_SYMBOL(class_import_put);
1033
1034 static void init_imp_at(struct imp_at *at) {
1035         int i;
1036         at_init(&at->iat_net_latency, 0, 0);
1037         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1038                 /* max service estimates are tracked on the server side, so
1039                    don't use the AT history here, just use the last reported
1040                    val. (But keep hist for proc histogram, worst_ever) */
1041                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1042                         AT_FLG_NOHIST);
1043         }
1044 }
1045
1046 struct obd_import *class_new_import(struct obd_device *obd)
1047 {
1048         struct obd_import *imp;
1049
1050         OBD_ALLOC(imp, sizeof(*imp));
1051         if (imp == NULL)
1052                 return NULL;
1053
1054         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1055         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1056         INIT_LIST_HEAD(&imp->imp_replay_list);
1057         INIT_LIST_HEAD(&imp->imp_sending_list);
1058         INIT_LIST_HEAD(&imp->imp_delayed_list);
1059         INIT_LIST_HEAD(&imp->imp_committed_list);
1060         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1061         imp->imp_known_replied_xid = 0;
1062         imp->imp_replay_cursor = &imp->imp_committed_list;
1063         spin_lock_init(&imp->imp_lock);
1064         imp->imp_last_success_conn = 0;
1065         imp->imp_state = LUSTRE_IMP_NEW;
1066         imp->imp_obd = class_incref(obd, "import", imp);
1067         mutex_init(&imp->imp_sec_mutex);
1068         init_waitqueue_head(&imp->imp_recovery_waitq);
1069
1070         atomic_set(&imp->imp_refcount, 2);
1071         atomic_set(&imp->imp_unregistering, 0);
1072         atomic_set(&imp->imp_inflight, 0);
1073         atomic_set(&imp->imp_replay_inflight, 0);
1074         atomic_set(&imp->imp_inval_count, 0);
1075         INIT_LIST_HEAD(&imp->imp_conn_list);
1076         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1077         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1078         init_imp_at(&imp->imp_at);
1079
1080         /* the default magic is V2, will be used in connect RPC, and
1081          * then adjusted according to the flags in request/reply. */
1082         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1083
1084         return imp;
1085 }
1086 EXPORT_SYMBOL(class_new_import);
1087
1088 void class_destroy_import(struct obd_import *import)
1089 {
1090         LASSERT(import != NULL);
1091         LASSERT(import != LP_POISON);
1092
1093         class_handle_unhash(&import->imp_handle);
1094
1095         spin_lock(&import->imp_lock);
1096         import->imp_generation++;
1097         spin_unlock(&import->imp_lock);
1098         class_import_put(import);
1099 }
1100 EXPORT_SYMBOL(class_destroy_import);
1101
1102 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1103
1104 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1105 {
1106         spin_lock(&exp->exp_locks_list_guard);
1107
1108         LASSERT(lock->l_exp_refs_nr >= 0);
1109
1110         if (lock->l_exp_refs_target != NULL &&
1111             lock->l_exp_refs_target != exp) {
1112                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1113                               exp, lock, lock->l_exp_refs_target);
1114         }
1115         if ((lock->l_exp_refs_nr ++) == 0) {
1116                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1117                 lock->l_exp_refs_target = exp;
1118         }
1119         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1120                lock, exp, lock->l_exp_refs_nr);
1121         spin_unlock(&exp->exp_locks_list_guard);
1122 }
1123
1124 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1125 {
1126         spin_lock(&exp->exp_locks_list_guard);
1127         LASSERT(lock->l_exp_refs_nr > 0);
1128         if (lock->l_exp_refs_target != exp) {
1129                 LCONSOLE_WARN("lock %p, "
1130                               "mismatching export pointers: %p, %p\n",
1131                               lock, lock->l_exp_refs_target, exp);
1132         }
1133         if (-- lock->l_exp_refs_nr == 0) {
1134                 list_del_init(&lock->l_exp_refs_link);
1135                 lock->l_exp_refs_target = NULL;
1136         }
1137         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1138                lock, exp, lock->l_exp_refs_nr);
1139         spin_unlock(&exp->exp_locks_list_guard);
1140 }
1141 #endif
1142
1143 /* A connection defines an export context in which preallocation can
1144    be managed. This releases the export pointer reference, and returns
1145    the export handle, so the export refcount is 1 when this function
1146    returns. */
1147 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1148                   struct obd_uuid *cluuid)
1149 {
1150         struct obd_export *export;
1151         LASSERT(conn != NULL);
1152         LASSERT(obd != NULL);
1153         LASSERT(cluuid != NULL);
1154         ENTRY;
1155
1156         export = class_new_export(obd, cluuid);
1157         if (IS_ERR(export))
1158                 RETURN(PTR_ERR(export));
1159
1160         conn->cookie = export->exp_handle.h_cookie;
1161         class_export_put(export);
1162
1163         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1164                cluuid->uuid, conn->cookie);
1165         RETURN(0);
1166 }
1167 EXPORT_SYMBOL(class_connect);
1168
1169 /* if export is involved in recovery then clean up related things */
1170 static void class_export_recovery_cleanup(struct obd_export *exp)
1171 {
1172         struct obd_device *obd = exp->exp_obd;
1173
1174         spin_lock(&obd->obd_recovery_task_lock);
1175         if (obd->obd_recovering) {
1176                 if (exp->exp_in_recovery) {
1177                         spin_lock(&exp->exp_lock);
1178                         exp->exp_in_recovery = 0;
1179                         spin_unlock(&exp->exp_lock);
1180                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1181                         atomic_dec(&obd->obd_connected_clients);
1182                 }
1183
1184                 /* if called during recovery then should update
1185                  * obd_stale_clients counter,
1186                  * lightweight exports are not counted */
1187                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1188                         exp->exp_obd->obd_stale_clients++;
1189         }
1190         spin_unlock(&obd->obd_recovery_task_lock);
1191
1192         spin_lock(&exp->exp_lock);
1193         /** Cleanup req replay fields */
1194         if (exp->exp_req_replay_needed) {
1195                 exp->exp_req_replay_needed = 0;
1196
1197                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1198                 atomic_dec(&obd->obd_req_replay_clients);
1199         }
1200
1201         /** Cleanup lock replay data */
1202         if (exp->exp_lock_replay_needed) {
1203                 exp->exp_lock_replay_needed = 0;
1204
1205                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1206                 atomic_dec(&obd->obd_lock_replay_clients);
1207         }
1208         spin_unlock(&exp->exp_lock);
1209 }
1210
1211 /* This function removes 1-3 references from the export:
1212  * 1 - for export pointer passed
1213  * and if disconnect really need
1214  * 2 - removing from hash
1215  * 3 - in client_unlink_export
1216  * The export pointer passed to this function can destroyed */
1217 int class_disconnect(struct obd_export *export)
1218 {
1219         int already_disconnected;
1220         ENTRY;
1221
1222         if (export == NULL) {
1223                 CWARN("attempting to free NULL export %p\n", export);
1224                 RETURN(-EINVAL);
1225         }
1226
1227         spin_lock(&export->exp_lock);
1228         already_disconnected = export->exp_disconnected;
1229         export->exp_disconnected = 1;
1230         spin_unlock(&export->exp_lock);
1231
1232         /* class_cleanup(), abort_recovery(), and class_fail_export()
1233          * all end up in here, and if any of them race we shouldn't
1234          * call extra class_export_puts(). */
1235         if (already_disconnected) {
1236                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1237                 GOTO(no_disconn, already_disconnected);
1238         }
1239
1240         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1241                export->exp_handle.h_cookie);
1242
1243         if (!hlist_unhashed(&export->exp_nid_hash))
1244                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1245                              &export->exp_connection->c_peer.nid,
1246                              &export->exp_nid_hash);
1247
1248         class_export_recovery_cleanup(export);
1249         class_unlink_export(export);
1250 no_disconn:
1251         class_export_put(export);
1252         RETURN(0);
1253 }
1254 EXPORT_SYMBOL(class_disconnect);
1255
1256 /* Return non-zero for a fully connected export */
1257 int class_connected_export(struct obd_export *exp)
1258 {
1259         int connected = 0;
1260
1261         if (exp) {
1262                 spin_lock(&exp->exp_lock);
1263                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1264                 spin_unlock(&exp->exp_lock);
1265         }
1266         return connected;
1267 }
1268 EXPORT_SYMBOL(class_connected_export);
1269
1270 static void class_disconnect_export_list(struct list_head *list,
1271                                          enum obd_option flags)
1272 {
1273         int rc;
1274         struct obd_export *exp;
1275         ENTRY;
1276
1277         /* It's possible that an export may disconnect itself, but
1278          * nothing else will be added to this list. */
1279         while (!list_empty(list)) {
1280                 exp = list_entry(list->next, struct obd_export,
1281                                  exp_obd_chain);
1282                 /* need for safe call CDEBUG after obd_disconnect */
1283                 class_export_get(exp);
1284
1285                 spin_lock(&exp->exp_lock);
1286                 exp->exp_flags = flags;
1287                 spin_unlock(&exp->exp_lock);
1288
1289                 if (obd_uuid_equals(&exp->exp_client_uuid,
1290                                     &exp->exp_obd->obd_uuid)) {
1291                         CDEBUG(D_HA,
1292                                "exp %p export uuid == obd uuid, don't discon\n",
1293                                exp);
1294                         /* Need to delete this now so we don't end up pointing
1295                          * to work_list later when this export is cleaned up. */
1296                         list_del_init(&exp->exp_obd_chain);
1297                         class_export_put(exp);
1298                         continue;
1299                 }
1300
1301                 class_export_get(exp);
1302                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1303                        "last request at "CFS_TIME_T"\n",
1304                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1305                        exp, exp->exp_last_request_time);
1306                 /* release one export reference anyway */
1307                 rc = obd_disconnect(exp);
1308
1309                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1310                        obd_export_nid2str(exp), exp, rc);
1311                 class_export_put(exp);
1312         }
1313         EXIT;
1314 }
1315
1316 void class_disconnect_exports(struct obd_device *obd)
1317 {
1318         struct list_head work_list;
1319         ENTRY;
1320
1321         /* Move all of the exports from obd_exports to a work list, en masse. */
1322         INIT_LIST_HEAD(&work_list);
1323         spin_lock(&obd->obd_dev_lock);
1324         list_splice_init(&obd->obd_exports, &work_list);
1325         list_splice_init(&obd->obd_delayed_exports, &work_list);
1326         spin_unlock(&obd->obd_dev_lock);
1327
1328         if (!list_empty(&work_list)) {
1329                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1330                        "disconnecting them\n", obd->obd_minor, obd);
1331                 class_disconnect_export_list(&work_list,
1332                                              exp_flags_from_obd(obd));
1333         } else
1334                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1335                        obd->obd_minor, obd);
1336         EXIT;
1337 }
1338 EXPORT_SYMBOL(class_disconnect_exports);
1339
1340 /* Remove exports that have not completed recovery.
1341  */
1342 void class_disconnect_stale_exports(struct obd_device *obd,
1343                                     int (*test_export)(struct obd_export *))
1344 {
1345         struct list_head work_list;
1346         struct obd_export *exp, *n;
1347         int evicted = 0;
1348         ENTRY;
1349
1350         INIT_LIST_HEAD(&work_list);
1351         spin_lock(&obd->obd_dev_lock);
1352         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1353                                  exp_obd_chain) {
1354                 /* don't count self-export as client */
1355                 if (obd_uuid_equals(&exp->exp_client_uuid,
1356                                     &exp->exp_obd->obd_uuid))
1357                         continue;
1358
1359                 /* don't evict clients which have no slot in last_rcvd
1360                  * (e.g. lightweight connection) */
1361                 if (exp->exp_target_data.ted_lr_idx == -1)
1362                         continue;
1363
1364                 spin_lock(&exp->exp_lock);
1365                 if (exp->exp_failed || test_export(exp)) {
1366                         spin_unlock(&exp->exp_lock);
1367                         continue;
1368                 }
1369                 exp->exp_failed = 1;
1370                 spin_unlock(&exp->exp_lock);
1371
1372                 list_move(&exp->exp_obd_chain, &work_list);
1373                 evicted++;
1374                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1375                        obd->obd_name, exp->exp_client_uuid.uuid,
1376                        exp->exp_connection == NULL ? "<unknown>" :
1377                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1378                 print_export_data(exp, "EVICTING", 0);
1379         }
1380         spin_unlock(&obd->obd_dev_lock);
1381
1382         if (evicted)
1383                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1384                               obd->obd_name, evicted);
1385
1386         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1387                                                  OBD_OPT_ABORT_RECOV);
1388         EXIT;
1389 }
1390 EXPORT_SYMBOL(class_disconnect_stale_exports);
1391
1392 void class_fail_export(struct obd_export *exp)
1393 {
1394         int rc, already_failed;
1395
1396         spin_lock(&exp->exp_lock);
1397         already_failed = exp->exp_failed;
1398         exp->exp_failed = 1;
1399         spin_unlock(&exp->exp_lock);
1400
1401         if (already_failed) {
1402                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1403                        exp, exp->exp_client_uuid.uuid);
1404                 return;
1405         }
1406
1407         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1408                exp, exp->exp_client_uuid.uuid);
1409
1410         if (obd_dump_on_timeout)
1411                 libcfs_debug_dumplog();
1412
1413         /* need for safe call CDEBUG after obd_disconnect */
1414         class_export_get(exp);
1415
1416         /* Most callers into obd_disconnect are removing their own reference
1417          * (request, for example) in addition to the one from the hash table.
1418          * We don't have such a reference here, so make one. */
1419         class_export_get(exp);
1420         rc = obd_disconnect(exp);
1421         if (rc)
1422                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1423         else
1424                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1425                        exp, exp->exp_client_uuid.uuid);
1426         class_export_put(exp);
1427 }
1428 EXPORT_SYMBOL(class_fail_export);
1429
1430 char *obd_export_nid2str(struct obd_export *exp)
1431 {
1432         if (exp->exp_connection != NULL)
1433                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1434
1435         return "(no nid)";
1436 }
1437 EXPORT_SYMBOL(obd_export_nid2str);
1438
1439 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1440 {
1441         struct cfs_hash *nid_hash;
1442         struct obd_export *doomed_exp = NULL;
1443         int exports_evicted = 0;
1444
1445         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1446
1447         spin_lock(&obd->obd_dev_lock);
1448         /* umount has run already, so evict thread should leave
1449          * its task to umount thread now */
1450         if (obd->obd_stopping) {
1451                 spin_unlock(&obd->obd_dev_lock);
1452                 return exports_evicted;
1453         }
1454         nid_hash = obd->obd_nid_hash;
1455         cfs_hash_getref(nid_hash);
1456         spin_unlock(&obd->obd_dev_lock);
1457
1458         do {
1459                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1460                 if (doomed_exp == NULL)
1461                         break;
1462
1463                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1464                          "nid %s found, wanted nid %s, requested nid %s\n",
1465                          obd_export_nid2str(doomed_exp),
1466                          libcfs_nid2str(nid_key), nid);
1467                 LASSERTF(doomed_exp != obd->obd_self_export,
1468                          "self-export is hashed by NID?\n");
1469                 exports_evicted++;
1470                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1471                               "request\n", obd->obd_name,
1472                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1473                               obd_export_nid2str(doomed_exp));
1474                 class_fail_export(doomed_exp);
1475                 class_export_put(doomed_exp);
1476         } while (1);
1477
1478         cfs_hash_putref(nid_hash);
1479
1480         if (!exports_evicted)
1481                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1482                        obd->obd_name, nid);
1483         return exports_evicted;
1484 }
1485 EXPORT_SYMBOL(obd_export_evict_by_nid);
1486
1487 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1488 {
1489         struct cfs_hash *uuid_hash;
1490         struct obd_export *doomed_exp = NULL;
1491         struct obd_uuid doomed_uuid;
1492         int exports_evicted = 0;
1493
1494         spin_lock(&obd->obd_dev_lock);
1495         if (obd->obd_stopping) {
1496                 spin_unlock(&obd->obd_dev_lock);
1497                 return exports_evicted;
1498         }
1499         uuid_hash = obd->obd_uuid_hash;
1500         cfs_hash_getref(uuid_hash);
1501         spin_unlock(&obd->obd_dev_lock);
1502
1503         obd_str2uuid(&doomed_uuid, uuid);
1504         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1505                 CERROR("%s: can't evict myself\n", obd->obd_name);
1506                 cfs_hash_putref(uuid_hash);
1507                 return exports_evicted;
1508         }
1509
1510         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1511
1512         if (doomed_exp == NULL) {
1513                 CERROR("%s: can't disconnect %s: no exports found\n",
1514                        obd->obd_name, uuid);
1515         } else {
1516                 CWARN("%s: evicting %s at adminstrative request\n",
1517                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1518                 class_fail_export(doomed_exp);
1519                 class_export_put(doomed_exp);
1520                 exports_evicted++;
1521         }
1522         cfs_hash_putref(uuid_hash);
1523
1524         return exports_evicted;
1525 }
1526
1527 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1528 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1529 #endif
1530
1531 static void print_export_data(struct obd_export *exp, const char *status,
1532                               int locks)
1533 {
1534         struct ptlrpc_reply_state *rs;
1535         struct ptlrpc_reply_state *first_reply = NULL;
1536         int nreplies = 0;
1537
1538         spin_lock(&exp->exp_lock);
1539         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1540                             rs_exp_list) {
1541                 if (nreplies == 0)
1542                         first_reply = rs;
1543                 nreplies++;
1544         }
1545         spin_unlock(&exp->exp_lock);
1546
1547         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1548                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1549                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1550                atomic_read(&exp->exp_rpc_count),
1551                atomic_read(&exp->exp_cb_count),
1552                atomic_read(&exp->exp_locks_count),
1553                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1554                nreplies, first_reply, nreplies > 3 ? "..." : "",
1555                exp->exp_last_committed);
1556 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1557         if (locks && class_export_dump_hook != NULL)
1558                 class_export_dump_hook(exp);
1559 #endif
1560 }
1561
1562 void dump_exports(struct obd_device *obd, int locks)
1563 {
1564         struct obd_export *exp;
1565
1566         spin_lock(&obd->obd_dev_lock);
1567         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1568                 print_export_data(exp, "ACTIVE", locks);
1569         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1570                 print_export_data(exp, "UNLINKED", locks);
1571         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1572                 print_export_data(exp, "DELAYED", locks);
1573         spin_unlock(&obd->obd_dev_lock);
1574         spin_lock(&obd_zombie_impexp_lock);
1575         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1576                 print_export_data(exp, "ZOMBIE", locks);
1577         spin_unlock(&obd_zombie_impexp_lock);
1578 }
1579
1580 void obd_exports_barrier(struct obd_device *obd)
1581 {
1582         int waited = 2;
1583         LASSERT(list_empty(&obd->obd_exports));
1584         spin_lock(&obd->obd_dev_lock);
1585         while (!list_empty(&obd->obd_unlinked_exports)) {
1586                 spin_unlock(&obd->obd_dev_lock);
1587                 set_current_state(TASK_UNINTERRUPTIBLE);
1588                 schedule_timeout(cfs_time_seconds(waited));
1589                 if (waited > 5 && IS_PO2(waited)) {
1590                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1591                                       "more than %d seconds. "
1592                                       "The obd refcount = %d. Is it stuck?\n",
1593                                       obd->obd_name, waited,
1594                                       atomic_read(&obd->obd_refcount));
1595                         dump_exports(obd, 1);
1596                 }
1597                 waited *= 2;
1598                 spin_lock(&obd->obd_dev_lock);
1599         }
1600         spin_unlock(&obd->obd_dev_lock);
1601 }
1602 EXPORT_SYMBOL(obd_exports_barrier);
1603
1604 /* Total amount of zombies to be destroyed */
1605 static int zombies_count = 0;
1606
1607 /**
1608  * kill zombie imports and exports
1609  */
1610 void obd_zombie_impexp_cull(void)
1611 {
1612         struct obd_import *import;
1613         struct obd_export *export;
1614         ENTRY;
1615
1616         do {
1617                 spin_lock(&obd_zombie_impexp_lock);
1618
1619                 import = NULL;
1620                 if (!list_empty(&obd_zombie_imports)) {
1621                         import = list_entry(obd_zombie_imports.next,
1622                                             struct obd_import,
1623                                             imp_zombie_chain);
1624                         list_del_init(&import->imp_zombie_chain);
1625                 }
1626
1627                 export = NULL;
1628                 if (!list_empty(&obd_zombie_exports)) {
1629                         export = list_entry(obd_zombie_exports.next,
1630                                             struct obd_export,
1631                                             exp_obd_chain);
1632                         list_del_init(&export->exp_obd_chain);
1633                 }
1634
1635                 spin_unlock(&obd_zombie_impexp_lock);
1636
1637                 if (import != NULL) {
1638                         class_import_destroy(import);
1639                         spin_lock(&obd_zombie_impexp_lock);
1640                         zombies_count--;
1641                         spin_unlock(&obd_zombie_impexp_lock);
1642                 }
1643
1644                 if (export != NULL) {
1645                         class_export_destroy(export);
1646                         spin_lock(&obd_zombie_impexp_lock);
1647                         zombies_count--;
1648                         spin_unlock(&obd_zombie_impexp_lock);
1649                 }
1650
1651                 cond_resched();
1652         } while (import != NULL || export != NULL);
1653         EXIT;
1654 }
1655
1656 static struct completion        obd_zombie_start;
1657 static struct completion        obd_zombie_stop;
1658 static unsigned long            obd_zombie_flags;
1659 static wait_queue_head_t        obd_zombie_waitq;
1660 static pid_t                    obd_zombie_pid;
1661
1662 enum {
1663         OBD_ZOMBIE_STOP         = 0x0001,
1664 };
1665
1666 /**
1667  * check for work for kill zombie import/export thread.
1668  */
1669 static int obd_zombie_impexp_check(void *arg)
1670 {
1671         int rc;
1672
1673         spin_lock(&obd_zombie_impexp_lock);
1674         rc = (zombies_count == 0) &&
1675              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1676         spin_unlock(&obd_zombie_impexp_lock);
1677
1678         RETURN(rc);
1679 }
1680
1681 /**
1682  * Add export to the obd_zombe thread and notify it.
1683  */
1684 static void obd_zombie_export_add(struct obd_export *exp) {
1685         atomic_dec(&obd_stale_export_num);
1686         spin_lock(&exp->exp_obd->obd_dev_lock);
1687         LASSERT(!list_empty(&exp->exp_obd_chain));
1688         list_del_init(&exp->exp_obd_chain);
1689         spin_unlock(&exp->exp_obd->obd_dev_lock);
1690         spin_lock(&obd_zombie_impexp_lock);
1691         zombies_count++;
1692         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1693         spin_unlock(&obd_zombie_impexp_lock);
1694
1695         obd_zombie_impexp_notify();
1696 }
1697
1698 /**
1699  * Add import to the obd_zombe thread and notify it.
1700  */
1701 static void obd_zombie_import_add(struct obd_import *imp) {
1702         LASSERT(imp->imp_sec == NULL);
1703         spin_lock(&obd_zombie_impexp_lock);
1704         LASSERT(list_empty(&imp->imp_zombie_chain));
1705         zombies_count++;
1706         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1707         spin_unlock(&obd_zombie_impexp_lock);
1708
1709         obd_zombie_impexp_notify();
1710 }
1711
1712 /**
1713  * notify import/export destroy thread about new zombie.
1714  */
1715 static void obd_zombie_impexp_notify(void)
1716 {
1717         /*
1718          * Make sure obd_zomebie_impexp_thread get this notification.
1719          * It is possible this signal only get by obd_zombie_barrier, and
1720          * barrier gulps this notification and sleeps away and hangs ensues
1721          */
1722         wake_up_all(&obd_zombie_waitq);
1723 }
1724
1725 /**
1726  * check whether obd_zombie is idle
1727  */
1728 static int obd_zombie_is_idle(void)
1729 {
1730         int rc;
1731
1732         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1733         spin_lock(&obd_zombie_impexp_lock);
1734         rc = (zombies_count == 0);
1735         spin_unlock(&obd_zombie_impexp_lock);
1736         return rc;
1737 }
1738
1739 /**
1740  * wait when obd_zombie import/export queues become empty
1741  */
1742 void obd_zombie_barrier(void)
1743 {
1744         struct l_wait_info lwi = { 0 };
1745
1746         if (obd_zombie_pid == current_pid())
1747                 /* don't wait for myself */
1748                 return;
1749         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1750 }
1751 EXPORT_SYMBOL(obd_zombie_barrier);
1752
1753
1754 struct obd_export *obd_stale_export_get(void)
1755 {
1756         struct obd_export *exp = NULL;
1757         ENTRY;
1758
1759         spin_lock(&obd_stale_export_lock);
1760         if (!list_empty(&obd_stale_exports)) {
1761                 exp = list_entry(obd_stale_exports.next,
1762                                  struct obd_export, exp_stale_list);
1763                 list_del_init(&exp->exp_stale_list);
1764         }
1765         spin_unlock(&obd_stale_export_lock);
1766
1767         if (exp) {
1768                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1769                        atomic_read(&obd_stale_export_num));
1770         }
1771         RETURN(exp);
1772 }
1773 EXPORT_SYMBOL(obd_stale_export_get);
1774
1775 void obd_stale_export_put(struct obd_export *exp)
1776 {
1777         ENTRY;
1778
1779         LASSERT(list_empty(&exp->exp_stale_list));
1780         if (exp->exp_lock_hash &&
1781             atomic_read(&exp->exp_lock_hash->hs_count)) {
1782                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1783                        atomic_read(&obd_stale_export_num));
1784
1785                 spin_lock_bh(&exp->exp_bl_list_lock);
1786                 spin_lock(&obd_stale_export_lock);
1787                 /* Add to the tail if there is no blocked locks,
1788                  * to the head otherwise. */
1789                 if (list_empty(&exp->exp_bl_list))
1790                         list_add_tail(&exp->exp_stale_list,
1791                                       &obd_stale_exports);
1792                 else
1793                         list_add(&exp->exp_stale_list,
1794                                  &obd_stale_exports);
1795
1796                 spin_unlock(&obd_stale_export_lock);
1797                 spin_unlock_bh(&exp->exp_bl_list_lock);
1798         } else {
1799                 class_export_put(exp);
1800         }
1801         EXIT;
1802 }
1803 EXPORT_SYMBOL(obd_stale_export_put);
1804
1805 /**
1806  * Adjust the position of the export in the stale list,
1807  * i.e. move to the head of the list if is needed.
1808  **/
1809 void obd_stale_export_adjust(struct obd_export *exp)
1810 {
1811         LASSERT(exp != NULL);
1812         spin_lock_bh(&exp->exp_bl_list_lock);
1813         spin_lock(&obd_stale_export_lock);
1814
1815         if (!list_empty(&exp->exp_stale_list) &&
1816             !list_empty(&exp->exp_bl_list))
1817                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1818
1819         spin_unlock(&obd_stale_export_lock);
1820         spin_unlock_bh(&exp->exp_bl_list_lock);
1821 }
1822 EXPORT_SYMBOL(obd_stale_export_adjust);
1823
1824 /**
1825  * destroy zombie export/import thread.
1826  */
1827 static int obd_zombie_impexp_thread(void *unused)
1828 {
1829         unshare_fs_struct();
1830         complete(&obd_zombie_start);
1831
1832         obd_zombie_pid = current_pid();
1833
1834         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1835                 struct l_wait_info lwi = { 0 };
1836
1837                 l_wait_event(obd_zombie_waitq,
1838                              !obd_zombie_impexp_check(NULL), &lwi);
1839                 obd_zombie_impexp_cull();
1840
1841                 /*
1842                  * Notify obd_zombie_barrier callers that queues
1843                  * may be empty.
1844                  */
1845                 wake_up(&obd_zombie_waitq);
1846         }
1847
1848         complete(&obd_zombie_stop);
1849
1850         RETURN(0);
1851 }
1852
1853
1854 /**
1855  * start destroy zombie import/export thread
1856  */
1857 int obd_zombie_impexp_init(void)
1858 {
1859         struct task_struct *task;
1860
1861         INIT_LIST_HEAD(&obd_zombie_imports);
1862
1863         INIT_LIST_HEAD(&obd_zombie_exports);
1864         spin_lock_init(&obd_zombie_impexp_lock);
1865         init_completion(&obd_zombie_start);
1866         init_completion(&obd_zombie_stop);
1867         init_waitqueue_head(&obd_zombie_waitq);
1868         obd_zombie_pid = 0;
1869
1870         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1871         if (IS_ERR(task))
1872                 RETURN(PTR_ERR(task));
1873
1874         wait_for_completion(&obd_zombie_start);
1875         RETURN(0);
1876 }
1877 /**
1878  * stop destroy zombie import/export thread
1879  */
1880 void obd_zombie_impexp_stop(void)
1881 {
1882         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1883         obd_zombie_impexp_notify();
1884         wait_for_completion(&obd_zombie_stop);
1885 }
1886
1887 /***** Kernel-userspace comm helpers *******/
1888
1889 /* Get length of entire message, including header */
1890 int kuc_len(int payload_len)
1891 {
1892         return sizeof(struct kuc_hdr) + payload_len;
1893 }
1894 EXPORT_SYMBOL(kuc_len);
1895
1896 /* Get a pointer to kuc header, given a ptr to the payload
1897  * @param p Pointer to payload area
1898  * @returns Pointer to kuc header
1899  */
1900 struct kuc_hdr * kuc_ptr(void *p)
1901 {
1902         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1903         LASSERT(lh->kuc_magic == KUC_MAGIC);
1904         return lh;
1905 }
1906 EXPORT_SYMBOL(kuc_ptr);
1907
1908 /* Test if payload is part of kuc message
1909  * @param p Pointer to payload area
1910  * @returns boolean
1911  */
1912 int kuc_ispayload(void *p)
1913 {
1914         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1915
1916         if (kh->kuc_magic == KUC_MAGIC)
1917                 return 1;
1918         else
1919                 return 0;
1920 }
1921 EXPORT_SYMBOL(kuc_ispayload);
1922
1923 /* Alloc space for a message, and fill in header
1924  * @return Pointer to payload area
1925  */
1926 void *kuc_alloc(int payload_len, int transport, int type)
1927 {
1928         struct kuc_hdr *lh;
1929         int len = kuc_len(payload_len);
1930
1931         OBD_ALLOC(lh, len);
1932         if (lh == NULL)
1933                 return ERR_PTR(-ENOMEM);
1934
1935         lh->kuc_magic = KUC_MAGIC;
1936         lh->kuc_transport = transport;
1937         lh->kuc_msgtype = type;
1938         lh->kuc_msglen = len;
1939
1940         return (void *)(lh + 1);
1941 }
1942 EXPORT_SYMBOL(kuc_alloc);
1943
1944 /* Takes pointer to payload area */
1945 inline void kuc_free(void *p, int payload_len)
1946 {
1947         struct kuc_hdr *lh = kuc_ptr(p);
1948         OBD_FREE(lh, kuc_len(payload_len));
1949 }
1950 EXPORT_SYMBOL(kuc_free);
1951
1952 struct obd_request_slot_waiter {
1953         struct list_head        orsw_entry;
1954         wait_queue_head_t       orsw_waitq;
1955         bool                    orsw_signaled;
1956 };
1957
1958 static bool obd_request_slot_avail(struct client_obd *cli,
1959                                    struct obd_request_slot_waiter *orsw)
1960 {
1961         bool avail;
1962
1963         spin_lock(&cli->cl_loi_list_lock);
1964         avail = !!list_empty(&orsw->orsw_entry);
1965         spin_unlock(&cli->cl_loi_list_lock);
1966
1967         return avail;
1968 };
1969
1970 /*
1971  * For network flow control, the RPC sponsor needs to acquire a credit
1972  * before sending the RPC. The credits count for a connection is defined
1973  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1974  * the subsequent RPC sponsors need to wait until others released their
1975  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1976  */
1977 int obd_get_request_slot(struct client_obd *cli)
1978 {
1979         struct obd_request_slot_waiter   orsw;
1980         struct l_wait_info               lwi;
1981         int                              rc;
1982
1983         spin_lock(&cli->cl_loi_list_lock);
1984         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1985                 cli->cl_r_in_flight++;
1986                 spin_unlock(&cli->cl_loi_list_lock);
1987                 return 0;
1988         }
1989
1990         init_waitqueue_head(&orsw.orsw_waitq);
1991         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1992         orsw.orsw_signaled = false;
1993         spin_unlock(&cli->cl_loi_list_lock);
1994
1995         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1996         rc = l_wait_event(orsw.orsw_waitq,
1997                           obd_request_slot_avail(cli, &orsw) ||
1998                           orsw.orsw_signaled,
1999                           &lwi);
2000
2001         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2002          * freed but other (such as obd_put_request_slot) is using it. */
2003         spin_lock(&cli->cl_loi_list_lock);
2004         if (rc != 0) {
2005                 if (!orsw.orsw_signaled) {
2006                         if (list_empty(&orsw.orsw_entry))
2007                                 cli->cl_r_in_flight--;
2008                         else
2009                                 list_del(&orsw.orsw_entry);
2010                 }
2011         }
2012
2013         if (orsw.orsw_signaled) {
2014                 LASSERT(list_empty(&orsw.orsw_entry));
2015
2016                 rc = -EINTR;
2017         }
2018         spin_unlock(&cli->cl_loi_list_lock);
2019
2020         return rc;
2021 }
2022 EXPORT_SYMBOL(obd_get_request_slot);
2023
2024 void obd_put_request_slot(struct client_obd *cli)
2025 {
2026         struct obd_request_slot_waiter *orsw;
2027
2028         spin_lock(&cli->cl_loi_list_lock);
2029         cli->cl_r_in_flight--;
2030
2031         /* If there is free slot, wakeup the first waiter. */
2032         if (!list_empty(&cli->cl_loi_read_list) &&
2033             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2034                 orsw = list_entry(cli->cl_loi_read_list.next,
2035                                   struct obd_request_slot_waiter, orsw_entry);
2036                 list_del_init(&orsw->orsw_entry);
2037                 cli->cl_r_in_flight++;
2038                 wake_up(&orsw->orsw_waitq);
2039         }
2040         spin_unlock(&cli->cl_loi_list_lock);
2041 }
2042 EXPORT_SYMBOL(obd_put_request_slot);
2043
2044 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2045 {
2046         return cli->cl_max_rpcs_in_flight;
2047 }
2048 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2049
2050 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2051 {
2052         struct obd_request_slot_waiter *orsw;
2053         __u32                           old;
2054         int                             diff;
2055         int                             i;
2056         char                            *typ_name;
2057         int                             rc;
2058
2059         if (max > OBD_MAX_RIF_MAX || max < 1)
2060                 return -ERANGE;
2061
2062         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2063         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2064                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2065                  * strictly lower that max_rpcs_in_flight */
2066                 if (max < 2) {
2067                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2068                                "because it must be higher than "
2069                                "max_mod_rpcs_in_flight value",
2070                                cli->cl_import->imp_obd->obd_name);
2071                         return -ERANGE;
2072                 }
2073                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2074                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2075                         if (rc != 0)
2076                                 return rc;
2077                 }
2078         }
2079
2080         spin_lock(&cli->cl_loi_list_lock);
2081         old = cli->cl_max_rpcs_in_flight;
2082         cli->cl_max_rpcs_in_flight = max;
2083         diff = max - old;
2084
2085         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2086         for (i = 0; i < diff; i++) {
2087                 if (list_empty(&cli->cl_loi_read_list))
2088                         break;
2089
2090                 orsw = list_entry(cli->cl_loi_read_list.next,
2091                                   struct obd_request_slot_waiter, orsw_entry);
2092                 list_del_init(&orsw->orsw_entry);
2093                 cli->cl_r_in_flight++;
2094                 wake_up(&orsw->orsw_waitq);
2095         }
2096         spin_unlock(&cli->cl_loi_list_lock);
2097
2098         return 0;
2099 }
2100 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2101
2102 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2103 {
2104         return cli->cl_max_mod_rpcs_in_flight;
2105 }
2106 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2107
2108 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2109 {
2110         struct obd_connect_data *ocd;
2111         __u16 maxmodrpcs;
2112         __u16 prev;
2113
2114         if (max > OBD_MAX_RIF_MAX || max < 1)
2115                 return -ERANGE;
2116
2117         /* cannot exceed or equal max_rpcs_in_flight */
2118         if (max >= cli->cl_max_rpcs_in_flight) {
2119                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2120                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2121                        cli->cl_import->imp_obd->obd_name,
2122                        max, cli->cl_max_rpcs_in_flight);
2123                 return -ERANGE;
2124         }
2125
2126         /* cannot exceed max modify RPCs in flight supported by the server */
2127         ocd = &cli->cl_import->imp_connect_data;
2128         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2129                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2130         else
2131                 maxmodrpcs = 1;
2132         if (max > maxmodrpcs) {
2133                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2134                        "higher than max_mod_rpcs_per_client value (%hu) "
2135                        "returned by the server at connection\n",
2136                        cli->cl_import->imp_obd->obd_name,
2137                        max, maxmodrpcs);
2138                 return -ERANGE;
2139         }
2140
2141         spin_lock(&cli->cl_mod_rpcs_lock);
2142
2143         prev = cli->cl_max_mod_rpcs_in_flight;
2144         cli->cl_max_mod_rpcs_in_flight = max;
2145
2146         /* wakeup waiters if limit has been increased */
2147         if (cli->cl_max_mod_rpcs_in_flight > prev)
2148                 wake_up(&cli->cl_mod_rpcs_waitq);
2149
2150         spin_unlock(&cli->cl_mod_rpcs_lock);
2151
2152         return 0;
2153 }
2154 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2155
2156
2157 #define pct(a, b) (b ? a * 100 / b : 0)
2158 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2159                                struct seq_file *seq)
2160 {
2161         struct timeval now;
2162         unsigned long mod_tot = 0, mod_cum;
2163         int i;
2164
2165         do_gettimeofday(&now);
2166
2167         spin_lock(&cli->cl_mod_rpcs_lock);
2168
2169         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2170                    now.tv_sec, now.tv_usec);
2171         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2172                    cli->cl_mod_rpcs_in_flight);
2173
2174         seq_printf(seq, "\n\t\t\tmodify\n");
2175         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2176
2177         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2178
2179         mod_cum = 0;
2180         for (i = 0; i < OBD_HIST_MAX; i++) {
2181                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2182                 mod_cum += mod;
2183                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2184                                  i, mod, pct(mod, mod_tot),
2185                                  pct(mod_cum, mod_tot));
2186                 if (mod_cum == mod_tot)
2187                         break;
2188         }
2189
2190         spin_unlock(&cli->cl_mod_rpcs_lock);
2191
2192         return 0;
2193 }
2194 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2195 #undef pct
2196
2197
2198 /* The number of modify RPCs sent in parallel is limited
2199  * because the server has a finite number of slots per client to
2200  * store request result and ensure reply reconstruction when needed.
2201  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2202  * that takes into account server limit and cl_max_rpcs_in_flight
2203  * value.
2204  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2205  * one close request is allowed above the maximum.
2206  */
2207 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2208                                                  bool close_req)
2209 {
2210         bool avail;
2211
2212         /* A slot is available if
2213          * - number of modify RPCs in flight is less than the max
2214          * - it's a close RPC and no other close request is in flight
2215          */
2216         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2217                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2218
2219         return avail;
2220 }
2221
2222 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2223                                          bool close_req)
2224 {
2225         bool avail;
2226
2227         spin_lock(&cli->cl_mod_rpcs_lock);
2228         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2229         spin_unlock(&cli->cl_mod_rpcs_lock);
2230         return avail;
2231 }
2232
2233 /* Get a modify RPC slot from the obd client @cli according
2234  * to the kind of operation @opc that is going to be sent
2235  * and the intent @it of the operation if it applies.
2236  * If the maximum number of modify RPCs in flight is reached
2237  * the thread is put to sleep.
2238  * Returns the tag to be set in the request message. Tag 0
2239  * is reserved for non-modifying requests.
2240  */
2241 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2242                            struct lookup_intent *it)
2243 {
2244         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2245         bool                    close_req = false;
2246         __u16                   i, max;
2247
2248         /* read-only metadata RPCs don't consume a slot on MDT
2249          * for reply reconstruction
2250          */
2251         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2252                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2253                 return 0;
2254
2255         if (opc == MDS_CLOSE)
2256                 close_req = true;
2257
2258         do {
2259                 spin_lock(&cli->cl_mod_rpcs_lock);
2260                 max = cli->cl_max_mod_rpcs_in_flight;
2261                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2262                         /* there is a slot available */
2263                         cli->cl_mod_rpcs_in_flight++;
2264                         if (close_req)
2265                                 cli->cl_close_rpcs_in_flight++;
2266                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2267                                          cli->cl_mod_rpcs_in_flight);
2268                         /* find a free tag */
2269                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2270                                                 max + 1);
2271                         LASSERT(i < OBD_MAX_RIF_MAX);
2272                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2273                         spin_unlock(&cli->cl_mod_rpcs_lock);
2274                         /* tag 0 is reserved for non-modify RPCs */
2275                         return i + 1;
2276                 }
2277                 spin_unlock(&cli->cl_mod_rpcs_lock);
2278
2279                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2280                        "opc %u, max %hu\n",
2281                        cli->cl_import->imp_obd->obd_name, opc, max);
2282
2283                 l_wait_event(cli->cl_mod_rpcs_waitq,
2284                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2285         } while (true);
2286 }
2287 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2288
2289 /* Put a modify RPC slot from the obd client @cli according
2290  * to the kind of operation @opc that has been sent and the
2291  * intent @it of the operation if it applies.
2292  */
2293 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2294                           struct lookup_intent *it, __u16 tag)
2295 {
2296         bool                    close_req = false;
2297
2298         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2299                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2300                 return;
2301
2302         if (opc == MDS_CLOSE)
2303                 close_req = true;
2304
2305         spin_lock(&cli->cl_mod_rpcs_lock);
2306         cli->cl_mod_rpcs_in_flight--;
2307         if (close_req)
2308                 cli->cl_close_rpcs_in_flight--;
2309         /* release the tag in the bitmap */
2310         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2311         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2312         spin_unlock(&cli->cl_mod_rpcs_lock);
2313         wake_up(&cli->cl_mod_rpcs_waitq);
2314 }
2315 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2316