Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / obdecho / echo.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdecho/echo.c
32  *
33  * Author: Peter Braam <braam@clusterfs.com>
34  * Author: Andreas Dilger <adilger@clusterfs.com>
35  */
36
37 #define DEBUG_SUBSYSTEM S_ECHO
38
39 #include <obd_support.h>
40 #include <obd_class.h>
41 #include <lustre_dlm.h>
42 #include <lprocfs_status.h>
43
44 #include "echo_internal.h"
45
46 /*
47  * The echo objid needs to be below 2^32, because regular FID numbers are
48  * limited to 2^32 objects in f_oid for the FID_SEQ_ECHO range. b=23335
49  */
50 #define ECHO_INIT_OID        0x10000000ULL
51 #define ECHO_HANDLE_MAGIC    0xabcd0123fedc9876ULL
52
53 #define ECHO_PERSISTENT_PAGES (ECHO_PERSISTENT_SIZE >> PAGE_SHIFT)
54 static struct page *echo_persistent_pages[ECHO_PERSISTENT_PAGES];
55
56 enum {
57         LPROC_ECHO_READ_BYTES = 1,
58         LPROC_ECHO_WRITE_BYTES = 2,
59         LPROC_ECHO_LAST = LPROC_ECHO_WRITE_BYTES + 1
60 };
61
62 struct echo_srv_device {
63         struct lu_device esd_dev;
64         struct lu_target esd_lut;
65 };
66
67 static inline struct echo_srv_device *echo_srv_dev(struct lu_device *d)
68 {
69         return container_of_safe(d, struct echo_srv_device, esd_dev);
70 }
71
72 static inline struct obd_device *echo_srv_obd(struct echo_srv_device *esd)
73 {
74         return esd->esd_dev.ld_obd;
75 }
76
77 static int echo_connect(const struct lu_env *env,
78                         struct obd_export **exp, struct obd_device *obd,
79                         struct obd_uuid *cluuid, struct obd_connect_data *data,
80                         void *localdata)
81 {
82         struct lustre_handle conn = { 0 };
83         int rc;
84
85         data->ocd_connect_flags &= ECHO_CONNECT_SUPPORTED;
86
87         if (data->ocd_connect_flags & OBD_CONNECT_FLAGS2)
88                 data->ocd_connect_flags2 &= ECHO_CONNECT_SUPPORTED2;
89
90         rc = class_connect(&conn, obd, cluuid);
91         if (rc) {
92                 CERROR("can't connect %d\n", rc);
93                 return rc;
94         }
95         *exp = class_conn2export(&conn);
96
97         return 0;
98 }
99
100 static int echo_disconnect(struct obd_export *exp)
101 {
102         LASSERT(exp != NULL);
103
104         return server_disconnect_export(exp);
105 }
106
107 static int echo_init_export(struct obd_export *exp)
108 {
109         return ldlm_init_export(exp);
110 }
111
112 static int echo_destroy_export(struct obd_export *exp)
113 {
114         ENTRY;
115
116         target_destroy_export(exp);
117         ldlm_destroy_export(exp);
118
119         RETURN(0);
120 }
121
122 static u64 echo_next_id(struct obd_device *obd)
123 {
124         u64 id;
125         struct echo_obd *echo = obd2echo(obd);
126
127         spin_lock(&echo->eo_lock);
128         id = ++echo->eo_lastino;
129         spin_unlock(&echo->eo_lock);
130
131         return id;
132 }
133
134 static void
135 echo_page_debug_setup(struct page *page, int rw, u64 id,
136                       __u64 offset, int len)
137 {
138         int   page_offset = offset & ~PAGE_MASK;
139         char *addr        = ((char *)kmap(page)) + page_offset;
140
141         if (len % OBD_ECHO_BLOCK_SIZE != 0)
142                 CERROR("Unexpected block size %d\n", len);
143
144         while (len > 0) {
145                 if (rw & OBD_BRW_READ)
146                         block_debug_setup(addr, OBD_ECHO_BLOCK_SIZE,
147                                           offset, id);
148                 else
149                         block_debug_setup(addr, OBD_ECHO_BLOCK_SIZE,
150                                           0xecc0ecc0ecc0ecc0ULL,
151                                           0xecc0ecc0ecc0ecc0ULL);
152
153                 addr   += OBD_ECHO_BLOCK_SIZE;
154                 offset += OBD_ECHO_BLOCK_SIZE;
155                 len    -= OBD_ECHO_BLOCK_SIZE;
156         }
157
158         kunmap(page);
159 }
160
161 static int
162 echo_page_debug_check(struct page *page, u64 id,
163                       __u64 offset, int len)
164 {
165         int   page_offset = offset & ~PAGE_MASK;
166         char *addr        = ((char *)kmap(page)) + page_offset;
167         int   rc          = 0;
168         int   rc2;
169
170         if (len % OBD_ECHO_BLOCK_SIZE != 0)
171                 CERROR("Unexpected block size %d\n", len);
172
173         while (len > 0) {
174                 rc2 = block_debug_check("echo", addr, OBD_ECHO_BLOCK_SIZE,
175                                         offset, id);
176
177                 if (rc2 != 0 && rc == 0)
178                         rc = rc2;
179
180                 addr   += OBD_ECHO_BLOCK_SIZE;
181                 offset += OBD_ECHO_BLOCK_SIZE;
182                 len    -= OBD_ECHO_BLOCK_SIZE;
183         }
184
185         kunmap(page);
186
187         return rc;
188 }
189
190 static int echo_map_nb_to_lb(struct obdo *oa, struct obd_ioobj *obj,
191                              struct niobuf_remote *nb, int *pages,
192                              struct niobuf_local *lb, int cmd, int *left)
193 {
194         gfp_t gfp_mask = (ostid_id(&obj->ioo_oid) & 1) ?
195                         GFP_HIGHUSER : GFP_KERNEL;
196         int ispersistent = ostid_id(&obj->ioo_oid) == ECHO_PERSISTENT_OBJID;
197         int debug_setup = (!ispersistent &&
198                            (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
199                            (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
200         struct niobuf_local *res = lb;
201         u64 offset = nb->rnb_offset;
202         int len = nb->rnb_len;
203
204         while (len > 0) {
205                 int plen = PAGE_SIZE - (offset & (PAGE_SIZE - 1));
206
207                 if (len < plen)
208                         plen = len;
209
210                 /* check for local buf overflow */
211                 if (*left == 0)
212                         return -EINVAL;
213
214                 res->lnb_file_offset = offset;
215                 res->lnb_len = plen;
216                 LASSERT((res->lnb_file_offset & ~PAGE_MASK) +
217                         res->lnb_len <= PAGE_SIZE);
218
219                 if (ispersistent &&
220                     ((res->lnb_file_offset >> PAGE_SHIFT) <
221                       ECHO_PERSISTENT_PAGES)) {
222                         res->lnb_page =
223                                 echo_persistent_pages[res->lnb_file_offset >>
224                                                       PAGE_SHIFT];
225                         /* Take extra ref so __free_pages() can be called OK */
226                         get_page(res->lnb_page);
227                 } else {
228                         res->lnb_page = alloc_page(gfp_mask);
229                         if (!res->lnb_page) {
230                                 CERROR("can't get page for id " DOSTID"\n",
231                                        POSTID(&obj->ioo_oid));
232                                 return -ENOMEM;
233                         }
234                         /* set mapping so page is not considered encrypted */
235                         res->lnb_page->mapping = ECHO_MAPPING_UNENCRYPTED;
236                 }
237
238                 CDEBUG(D_PAGE, "$$$$ get page %p @ %llu for %d\n",
239                        res->lnb_page, res->lnb_file_offset, res->lnb_len);
240
241                 if (cmd & OBD_BRW_READ)
242                         res->lnb_rc = res->lnb_len;
243
244                 if (debug_setup)
245                         echo_page_debug_setup(res->lnb_page, cmd,
246                                               ostid_id(&obj->ioo_oid),
247                                               res->lnb_file_offset,
248                                               res->lnb_len);
249
250                 offset += plen;
251                 len -= plen;
252                 res++;
253
254                 (*left)--;
255                 (*pages)++;
256         }
257
258         return 0;
259 }
260
261 static int echo_finalize_lb(struct obdo *oa, struct obd_ioobj *obj,
262                             struct niobuf_remote *rb, int *pgs,
263                             struct niobuf_local *lb, int verify)
264 {
265         struct niobuf_local *res = lb;
266         u64 start = rb->rnb_offset >> PAGE_SHIFT;
267         u64 end   = (rb->rnb_offset + rb->rnb_len + PAGE_SIZE - 1) >>
268                     PAGE_SHIFT;
269         int     count  = (int)(end - start);
270         int     rc     = 0;
271         int     i;
272
273         for (i = 0; i < count; i++, (*pgs) ++, res++) {
274                 struct page *page = res->lnb_page;
275                 void       *addr;
276
277                 if (!page) {
278                         CERROR("null page objid %llu:%p, buf %d/%d\n",
279                                ostid_id(&obj->ioo_oid), page, i,
280                                obj->ioo_bufcnt);
281                         return -EFAULT;
282                 }
283
284                 addr = kmap(page);
285
286                 CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@%llu\n",
287                        res->lnb_page, addr, res->lnb_file_offset);
288
289                 if (verify) {
290                         int vrc = echo_page_debug_check(page,
291                                                         ostid_id(&obj->ioo_oid),
292                                                         res->lnb_file_offset,
293                                                         res->lnb_len);
294                         /* check all the pages always */
295                         if (vrc != 0 && rc == 0)
296                                 rc = vrc;
297                 }
298
299                 kunmap(page);
300                 /* NB see comment above regarding persistent pages */
301                 __free_page(page);
302         }
303
304         return rc;
305 }
306
307 static int echo_preprw(const struct lu_env *env, int cmd,
308                        struct obd_export *export, struct obdo *oa,
309                        int objcount, struct obd_ioobj *obj,
310                        struct niobuf_remote *nb, int *pages,
311                        struct niobuf_local *res)
312 {
313         struct obd_device *obd;
314         int tot_bytes = 0;
315         int rc = 0;
316         int i, left;
317
318         ENTRY;
319
320         obd = export->exp_obd;
321         if (!obd)
322                 RETURN(-EINVAL);
323
324         /* Temp fix to stop falling foul of osc_announce_cached() */
325         oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLGRANT);
326
327         memset(res, 0, sizeof(*res) * *pages);
328
329         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
330                cmd == OBD_BRW_READ ? "reading" : "writing", objcount, *pages);
331
332         left = *pages;
333         *pages = 0;
334
335         for (i = 0; i < objcount; i++, obj++) {
336                 int j;
337
338                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++) {
339                         rc = echo_map_nb_to_lb(oa, obj, nb, pages,
340                                                res + *pages, cmd, &left);
341                         if (rc)
342                                 GOTO(preprw_cleanup, rc);
343
344                         tot_bytes += nb->rnb_len;
345                 }
346         }
347
348         atomic_add(*pages, &obd2echo(obd)->eo_prep);
349
350         if (cmd & OBD_BRW_READ)
351                 lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_READ_BYTES,
352                                     tot_bytes);
353         else
354                 lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_WRITE_BYTES,
355                                     tot_bytes);
356
357         CDEBUG(D_PAGE, "%d pages allocated after prep\n",
358                atomic_read(&obd2echo(obd)->eo_prep));
359
360         RETURN(0);
361
362 preprw_cleanup:
363         /*
364          * It is possible that we would rather handle errors by  allow
365          * any already-set-up pages to complete, rather than tearing them
366          * all down again.  I believe that this is what the in-kernel
367          * prep/commit operations do.
368          */
369         CERROR("cleaning up %u pages (%d obdos)\n", *pages, objcount);
370         for (i = 0; i < *pages; i++) {
371                 kunmap(res[i].lnb_page);
372                 /*
373                  * NB if this is a persistent page, __free_page() will just
374                  * lose the extra ref gained above
375                  */
376                 __free_page(res[i].lnb_page);
377                 res[i].lnb_page = NULL;
378                 atomic_dec(&obd2echo(obd)->eo_prep);
379         }
380
381         return rc;
382 }
383
384 static int echo_commitrw(const struct lu_env *env, int cmd,
385                          struct obd_export *export, struct obdo *oa,
386                          int objcount, struct obd_ioobj *obj,
387                          struct niobuf_remote *rb, int niocount,
388                          struct niobuf_local *res, int rc, int nob,
389                          ktime_t kstart)
390 {
391         struct obd_device *obd;
392         int pgs = 0;
393         int i;
394
395         ENTRY;
396
397         obd = export->exp_obd;
398         if (!obd)
399                 RETURN(-EINVAL);
400
401         if (rc)
402                 GOTO(commitrw_cleanup, rc);
403
404         if (cmd == OBD_BRW_READ) {
405                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
406                        objcount, niocount);
407         } else {
408                 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
409                        objcount, niocount);
410         }
411
412         if (niocount && !res) {
413                 CERROR("NULL res niobuf with niocount %d\n", niocount);
414                 RETURN(-EINVAL);
415         }
416
417         for (i = 0; i < objcount; i++, obj++) {
418                 int verify = (rc == 0 &&
419                              ostid_id(&obj->ioo_oid) != ECHO_PERSISTENT_OBJID &&
420                               (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
421                               (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
422                 int j;
423
424                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, rb++) {
425                         int vrc = echo_finalize_lb(oa, obj, rb, &pgs, &res[pgs],
426                                                    verify);
427                         if (vrc == 0)
428                                 continue;
429
430                         if (vrc == -EFAULT)
431                                 GOTO(commitrw_cleanup, rc = vrc);
432
433                         if (rc == 0)
434                                 rc = vrc;
435                 }
436         }
437
438         atomic_sub(pgs, &obd2echo(obd)->eo_prep);
439
440         CDEBUG(D_PAGE, "%d pages remain after commit\n",
441                atomic_read(&obd2echo(obd)->eo_prep));
442         RETURN(rc);
443
444 commitrw_cleanup:
445         atomic_sub(pgs, &obd2echo(obd)->eo_prep);
446
447         CERROR("cleaning up %d pages (%d obdos)\n",
448                niocount - pgs - 1, objcount);
449
450         while (pgs < niocount) {
451                 struct page *page = res[pgs++].lnb_page;
452
453                 if (!page)
454                         continue;
455
456                 /* NB see comment above regarding persistent pages */
457                 __free_page(page);
458                 atomic_dec(&obd2echo(obd)->eo_prep);
459         }
460         return rc;
461 }
462
463 LPROC_SEQ_FOPS_RO_TYPE(echo, uuid);
464 static struct lprocfs_vars lprocfs_echo_obd_vars[] = {
465         { .name =       "uuid",
466           .fops =       &echo_uuid_fops         },
467         { NULL }
468 };
469
470 const struct obd_ops echo_obd_ops = {
471         .o_owner           = THIS_MODULE,
472         .o_connect         = echo_connect,
473         .o_disconnect      = echo_disconnect,
474         .o_init_export     = echo_init_export,
475         .o_destroy_export  = echo_destroy_export,
476         .o_preprw          = echo_preprw,
477         .o_commitrw        = echo_commitrw,
478 };
479
480 /**
481  * Echo Server request handler for OST_CREATE RPC.
482  *
483  * This is part of request processing. Its simulates the object
484  * creation on OST.
485  *
486  * \param[in] tsi       target session environment for this request
487  *
488  * \retval              0 if successful
489  * \retval              negative value on error
490  */
491 static int esd_create_hdl(struct tgt_session_info *tsi)
492 {
493         const struct obdo *oa = &tsi->tsi_ost_body->oa;
494         struct obd_device *obd = tsi->tsi_exp->exp_obd;
495         struct ost_body *repbody;
496         struct obdo *rep_oa;
497
498         ENTRY;
499
500         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
501         if (!repbody)
502                 RETURN(-ENOMEM);
503
504         if (!(oa->o_mode & S_IFMT)) {
505                 CERROR("%s: no type is set in obdo!\n",
506                        tsi->tsi_exp->exp_obd->obd_name);
507                 RETURN(-ENOENT);
508         }
509
510         if (!(oa->o_valid & OBD_MD_FLTYPE)) {
511                 CERROR("%s: invalid o_valid in obdo: %#llx\n",
512                        tsi->tsi_exp->exp_obd->obd_name, oa->o_valid);
513                 RETURN(-EINVAL);
514         }
515
516         rep_oa = &repbody->oa;
517
518         if (!fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
519                 CERROR("%s: invalid seq %#llx\n",
520                        tsi->tsi_exp->exp_obd->obd_name, ostid_seq(&oa->o_oi));
521                 return -EINVAL;
522         }
523
524         ostid_set_seq_echo(&rep_oa->o_oi);
525         ostid_set_id(&rep_oa->o_oi, echo_next_id(obd));
526
527         CDEBUG(D_INFO, "%s: Create object "DOSTID"\n",
528                tsi->tsi_exp->exp_obd->obd_name, POSTID(&rep_oa->o_oi));
529
530         rep_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
531
532         RETURN(0);
533 }
534
535 /**
536  * Echo Server request handler for OST_DESTROY RPC.
537  *
538  * This is Echo Server part of request handling. It simulates the objects
539  * destroy on OST.
540  *
541  * \param[in] tsi       target session environment for this request
542  *
543  * \retval              0 if successful
544  * \retval              negative value on error
545  */
546 static int esd_destroy_hdl(struct tgt_session_info *tsi)
547 {
548         const struct obdo *oa = &tsi->tsi_ost_body->oa;
549         struct obd_device *obd = tsi->tsi_exp->exp_obd;
550         struct ost_body *repbody;
551         u64 oid;
552
553         ENTRY;
554
555         oid = ostid_id(&oa->o_oi);
556         LASSERT(oid != 0);
557
558         if (!(oa->o_valid & OBD_MD_FLID)) {
559                 CERROR("%s: obdo missing FLID valid flag: %#llx\n",
560                        tsi->tsi_exp->exp_obd->obd_name, oa->o_valid);
561                 RETURN(-EINVAL);
562         }
563
564         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
565
566         if (ostid_id(&oa->o_oi) > obd2echo(obd)->eo_lastino ||
567             ostid_id(&oa->o_oi) < ECHO_INIT_OID) {
568                 CERROR("%s: bad objid to destroy: "DOSTID"\n",
569                        tsi->tsi_exp->exp_obd->obd_name, POSTID(&oa->o_oi));
570                 RETURN(-EINVAL);
571         }
572
573         CDEBUG(D_INFO, "%s: Destroy object "DOSTID"\n",
574                tsi->tsi_exp->exp_obd->obd_name, POSTID(&oa->o_oi));
575
576         repbody->oa.o_oi = oa->o_oi;
577         RETURN(0);
578 }
579
580 /**
581  * Echo Server request handler for OST_GETATTR RPC.
582  *
583  * This is Echo Server part of request handling. It returns an object
584  * attributes to the client. All objects have the same attributes in
585  * Echo Server.
586  *
587  * \param[in] tsi       target session environment for this request
588  *
589  * \retval              0 if successful
590  * \retval              negative value on error
591  */
592 static int esd_getattr_hdl(struct tgt_session_info *tsi)
593 {
594         const struct obdo *oa = &tsi->tsi_ost_body->oa;
595         struct obd_device *obd = tsi->tsi_exp->exp_obd;
596         struct ost_body *repbody;
597
598         ENTRY;
599
600         if (!(oa->o_valid & OBD_MD_FLID)) {
601                 CERROR("%s: obdo missing FLID valid flag: %#llx\n",
602                        tsi->tsi_exp->exp_obd->obd_name, oa->o_valid);
603                 RETURN(-EINVAL);
604         }
605
606         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
607         if (!repbody)
608                 RETURN(-ENOMEM);
609
610         repbody->oa.o_oi = oa->o_oi;
611         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
612
613         obdo_cpy_md(&repbody->oa, &obd2echo(obd)->eo_oa, oa->o_valid);
614
615         repbody->oa.o_valid |= OBD_MD_FLFLAGS;
616         repbody->oa.o_flags = OBD_FL_FLUSH;
617
618         RETURN(0);
619 }
620
621 /**
622  * Echo Server request handler for OST_SETATTR RPC.
623  *
624  * This is Echo Server part of request handling. It sets common
625  * attributes from request to the Echo Server objects.
626  *
627  * \param[in] tsi       target session environment for this request
628  *
629  * \retval              0 if successful
630  * \retval              negative value on error
631  */
632 static int esd_setattr_hdl(struct tgt_session_info *tsi)
633 {
634         struct ost_body *body = tsi->tsi_ost_body;
635         struct obd_device *obd = tsi->tsi_exp->exp_obd;
636         struct ost_body *repbody;
637
638         ENTRY;
639
640         if (!(body->oa.o_valid & OBD_MD_FLID)) {
641                 CERROR("%s: obdo missing FLID valid flag: %#llx\n",
642                        tsi->tsi_exp->exp_obd->obd_name,
643                        body->oa.o_valid);
644                 RETURN(-EINVAL);
645         }
646
647         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
648         if (!repbody)
649                 RETURN(-ENOMEM);
650
651         repbody->oa.o_oi = body->oa.o_oi;
652         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
653
654         obd2echo(obd)->eo_oa = body->oa;
655
656         RETURN(0);
657 }
658
659 #define OBD_FAIL_OST_READ_NET   OBD_FAIL_OST_BRW_NET
660 #define OBD_FAIL_OST_WRITE_NET  OBD_FAIL_OST_BRW_NET
661 #define OST_BRW_READ    OST_READ
662 #define OST_BRW_WRITE   OST_WRITE
663
664 /**
665  * Table of Echo Server specific request handlers
666  *
667  * This table contains all opcodes accepted by Echo Server and
668  * specifies handlers for them. The tgt_request_handler()
669  * uses such table from each target to process incoming
670  * requests.
671  */
672 static struct tgt_handler esd_tgt_handlers[] = {
673 TGT_RPC_HANDLER(OST_FIRST_OPC, 0, OST_CONNECT, tgt_connect,
674                 &RQF_CONNECT, LUSTRE_OBD_VERSION),
675 TGT_RPC_HANDLER(OST_FIRST_OPC, 0, OST_DISCONNECT, tgt_disconnect,
676                 &RQF_OST_DISCONNECT, LUSTRE_OBD_VERSION),
677 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_GETATTR, esd_getattr_hdl),
678 TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_SETATTR,
679             esd_setattr_hdl),
680 TGT_OST_HDL(HAS_REPLY | IS_MUTABLE, OST_CREATE, esd_create_hdl),
681 TGT_OST_HDL(HAS_REPLY | IS_MUTABLE, OST_DESTROY, esd_destroy_hdl),
682 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_BRW_READ, tgt_brw_read),
683 TGT_OST_HDL(HAS_BODY | IS_MUTABLE, OST_BRW_WRITE, tgt_brw_write),
684 };
685
686 static struct tgt_opc_slice esd_common_slice[] = {
687         {
688                 .tos_opc_start  = OST_FIRST_OPC,
689                 .tos_opc_end    = OST_LAST_OPC,
690                 .tos_hs         = esd_tgt_handlers
691         },
692         {
693                 .tos_opc_start  = OBD_FIRST_OPC,
694                 .tos_opc_end    = OBD_LAST_OPC,
695                 .tos_hs         = tgt_obd_handlers
696         },
697         {
698                 .tos_opc_start  = LDLM_FIRST_OPC,
699                 .tos_opc_end    = LDLM_LAST_OPC,
700                 .tos_hs         = tgt_dlm_handlers
701         },
702         {
703                 .tos_opc_start  = SEC_FIRST_OPC,
704                 .tos_opc_end    = SEC_LAST_OPC,
705                 .tos_hs         = tgt_sec_ctx_handlers
706         },
707         {
708                 .tos_hs         = NULL
709         }
710 };
711
712 /**
713  * lu_device_operations matrix for ECHO SRV device is NULL,
714  * this device is just serving incoming requests immediately
715  * without building a stack of lu_devices.
716  */
717 static const struct lu_device_operations echo_srv_lu_ops = { 0 };
718
719 /**
720  * Initialize Echo Server device with parameters in the config log \a cfg.
721  *
722  * This is the main starting point of Echo Server initialization. It fills all
723  * parameters with their initial values and starts Echo Server.
724  *
725  * \param[in] env       execution environment
726  * \param[in] m         Echo Server device
727  * \param[in] ldt       LU device type of Echo Server
728  * \param[in] cfg       configuration log
729  *
730  * \retval              0 if successful
731  * \retval              negative value on error
732  */
733 static int echo_srv_init0(const struct lu_env *env,
734                           struct echo_srv_device *esd,
735                           struct lu_device_type *ldt, struct lustre_cfg *cfg)
736 {
737         const char *dev = lustre_cfg_string(cfg, 0);
738         struct obd_device *obd;
739         char ns_name[48];
740         int rc;
741
742         ENTRY;
743
744         obd = class_name2obd(dev);
745         if (!obd) {
746                 CERROR("Cannot find obd with name %s\n", dev);
747                 RETURN(-ENODEV);
748         }
749
750         spin_lock_init(&obd2echo(obd)->eo_lock);
751         obd2echo(obd)->eo_lastino = ECHO_INIT_OID;
752
753         esd->esd_dev.ld_ops = &echo_srv_lu_ops;
754         esd->esd_dev.ld_obd = obd;
755         /* set this lu_device to obd, because error handling need it */
756         obd->obd_lu_dev = &esd->esd_dev;
757
758         /* No connection accepted until configurations will finish */
759         spin_lock(&obd->obd_dev_lock);
760         obd->obd_no_conn = 1;
761         spin_unlock(&obd->obd_dev_lock);
762
763         /* non-replayable target */
764         obd->obd_replayable = 0;
765
766         snprintf(ns_name, sizeof(ns_name), "echotgt-%s", obd->obd_uuid.uuid);
767         obd->obd_namespace = ldlm_namespace_new(obd, ns_name,
768                                                 LDLM_NAMESPACE_SERVER,
769                                                 LDLM_NAMESPACE_MODEST,
770                                                 LDLM_NS_TYPE_OST);
771         if (IS_ERR(obd->obd_namespace)) {
772                 rc = PTR_ERR(obd->obd_namespace);
773                 CERROR("%s: unable to create server namespace: rc = %d\n",
774                        obd->obd_name, rc);
775                 obd->obd_namespace = NULL;
776                 RETURN(rc);
777         }
778
779         obd->obd_vars = lprocfs_echo_obd_vars;
780         if (!lprocfs_obd_setup(obd, true) &&
781             lprocfs_alloc_obd_stats(obd, LPROC_ECHO_LAST) == 0) {
782                 lprocfs_counter_init(obd->obd_stats, LPROC_ECHO_READ_BYTES,
783                                      LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
784                                      "read_bytes");
785                 lprocfs_counter_init(obd->obd_stats, LPROC_ECHO_WRITE_BYTES,
786                                      LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
787                                      "write_bytes");
788         }
789
790         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
791                            "echo_ldlm_cb_client", &obd->obd_ldlm_client);
792
793         rc = tgt_init(env, &esd->esd_lut, obd, NULL, esd_common_slice,
794                       OBD_FAIL_OST_ALL_REQUEST_NET,
795                       OBD_FAIL_OST_ALL_REPLY_NET);
796         if (rc)
797                 GOTO(err_out, rc);
798
799         spin_lock(&obd->obd_dev_lock);
800         obd->obd_no_conn = 0;
801         spin_unlock(&obd->obd_dev_lock);
802
803         RETURN(0);
804
805 err_out:
806         ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
807         obd->obd_namespace = NULL;
808
809         lprocfs_obd_cleanup(obd);
810         lprocfs_free_obd_stats(obd);
811         RETURN(rc);
812 }
813
814 /**
815  * Stop the Echo Server device.
816  *
817  * This function stops the Echo Server device and all its subsystems.
818  * This is the end of Echo Server lifecycle.
819  *
820  * \param[in] env       execution environment
821  * \param[in] esd               ESD device
822  */
823 static void echo_srv_fini(const struct lu_env *env,
824                           struct echo_srv_device *esd)
825 {
826         struct obd_device *obd = echo_srv_obd(esd);
827         struct lu_device *d = &esd->esd_dev;
828         int leaked;
829
830         ENTRY;
831
832         class_disconnect_exports(obd);
833         if (obd->obd_namespace)
834                 ldlm_namespace_free_prior(obd->obd_namespace, NULL,
835                                           obd->obd_force);
836
837         obd_exports_barrier(obd);
838         obd_zombie_barrier();
839
840         tgt_fini(env, &esd->esd_lut);
841
842         if (obd->obd_namespace) {
843                 ldlm_namespace_free_post(obd->obd_namespace);
844                 obd->obd_namespace = NULL;
845         }
846
847         lprocfs_obd_cleanup(obd);
848         lprocfs_free_obd_stats(obd);
849
850         leaked = atomic_read(&obd2echo(obd)->eo_prep);
851         if (leaked != 0)
852                 CERROR("%d prep/commitrw pages leaked\n", leaked);
853
854         LASSERT(atomic_read(&d->ld_ref) == 0);
855         EXIT;
856 }
857
858 /**
859  * Implementation of lu_device_type_operations::ldto_device_fini.
860  *
861  * Finalize device. Dual to echo_srv_device_init(). It is called from
862  * obd_precleanup() and stops the current device.
863  *
864  * \param[in] env       execution environment
865  * \param[in] d         LU device of ESD
866  *
867  * \retval              NULL
868  */
869 static struct lu_device *echo_srv_device_fini(const struct lu_env *env,
870                                               struct lu_device *d)
871 {
872         ENTRY;
873         echo_srv_fini(env, echo_srv_dev(d));
874         RETURN(NULL);
875 }
876
877 /**
878  * Implementation of lu_device_type_operations::ldto_device_free.
879  *
880  * Free Echo Server device. Dual to echo_srv_device_alloc().
881  *
882  * \param[in] env       execution environment
883  * \param[in] d         LU device of ESD
884  *
885  * \retval              NULL
886  */
887 static struct lu_device *echo_srv_device_free(const struct lu_env *env,
888                                               struct lu_device *d)
889 {
890         struct echo_srv_device *esd = echo_srv_dev(d);
891
892         lu_device_fini(&esd->esd_dev);
893         OBD_FREE_PTR(esd);
894         RETURN(NULL);
895 }
896
897 /**
898  * Implementation of lu_device_type_operations::ldto_device_alloc.
899  *
900  * This function allocates the new Echo Server device. It is called from
901  * obd_setup() if OBD device had lu_device_type defined.
902  *
903  * \param[in] env       execution environment
904  * \param[in] t         lu_device_type of ESD device
905  * \param[in] cfg       configuration log
906  *
907  * \retval              pointer to the lu_device of just allocated OFD
908  * \retval              ERR_PTR of return value on error
909  */
910 static struct lu_device *echo_srv_device_alloc(const struct lu_env *env,
911                                                struct lu_device_type *t,
912                                                struct lustre_cfg *cfg)
913 {
914         struct echo_srv_device *esd;
915         struct lu_device *l;
916         int rc;
917
918         OBD_ALLOC_PTR(esd);
919         if (!esd)
920                 return ERR_PTR(-ENOMEM);
921
922         l = &esd->esd_dev;
923         lu_device_init(l, t);
924         rc = echo_srv_init0(env, esd, t, cfg);
925         if (rc != 0) {
926                 echo_srv_device_free(env, l);
927                 l = ERR_PTR(rc);
928         }
929
930         return l;
931 }
932
933 static const struct lu_device_type_operations echo_srv_type_ops = {
934         .ldto_device_alloc = echo_srv_device_alloc,
935         .ldto_device_free = echo_srv_device_free,
936         .ldto_device_fini = echo_srv_device_fini
937 };
938
939 struct lu_device_type echo_srv_type = {
940         .ldt_tags = LU_DEVICE_DT,
941         .ldt_name = LUSTRE_ECHO_NAME,
942         .ldt_ops = &echo_srv_type_ops,
943         .ldt_ctx_tags = LCT_DT_THREAD,
944 };
945
946 void echo_persistent_pages_fini(void)
947 {
948         int i;
949
950         for (i = 0; i < ECHO_PERSISTENT_PAGES; i++)
951                 if (echo_persistent_pages[i]) {
952                         __free_page(echo_persistent_pages[i]);
953                         echo_persistent_pages[i] = NULL;
954                 }
955 }
956
957 int echo_persistent_pages_init(void)
958 {
959         struct page *pg;
960         int          i;
961
962         for (i = 0; i < ECHO_PERSISTENT_PAGES; i++) {
963                 gfp_t gfp_mask = (i < ECHO_PERSISTENT_PAGES / 2) ?
964                         GFP_KERNEL : GFP_HIGHUSER;
965
966                 pg = alloc_page(gfp_mask);
967                 if (!pg) {
968                         echo_persistent_pages_fini();
969                         return -ENOMEM;
970                 }
971
972                 memset(kmap(pg), 0, PAGE_SIZE);
973                 kunmap(pg);
974                 /* set mapping so page is not considered encrypted */
975                 pg->mapping = ECHO_MAPPING_UNENCRYPTED;
976
977                 echo_persistent_pages[i] = pg;
978         }
979
980         return 0;
981 }