Whamcloud - gitweb
5a8165cec405af877c085b2fae48871d30c9aac7
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_ioctl.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include <lustre_fid.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 struct osc_brw_async_args {
65         struct obdo              *aa_oa;
66         int                       aa_requested_nob;
67         int                       aa_nio_count;
68         obd_count                 aa_page_count;
69         int                       aa_resends;
70         struct brw_page **aa_ppga;
71         struct client_obd        *aa_cli;
72         struct list_head          aa_oaps;
73         struct list_head          aa_exts;
74         struct obd_capa  *aa_ocapa;
75         struct cl_req            *aa_clerq;
76 };
77
78 #define osc_grant_args osc_brw_async_args
79
80 struct osc_async_args {
81         struct obd_info *aa_oi;
82 };
83
84 struct osc_setattr_args {
85         struct obdo             *sa_oa;
86         obd_enqueue_update_f     sa_upcall;
87         void                    *sa_cookie;
88 };
89
90 struct osc_fsync_args {
91         struct obd_info *fa_oi;
92         obd_enqueue_update_f     fa_upcall;
93         void                    *fa_cookie;
94 };
95
96 struct osc_enqueue_args {
97         struct obd_export               *oa_exp;
98         __u64                           *oa_flags;
99         obd_enqueue_update_f             oa_upcall;
100         void                            *oa_cookie;
101         struct ost_lvb                  *oa_lvb;
102         struct lustre_handle            *oa_lockh;
103         struct ldlm_enqueue_info        *oa_ei;
104         unsigned int                     oa_agl:1;
105 };
106
107 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
108 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
109                          void *data, int rc);
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         struct obd_import *imp = class_exp2cliimp(exp);
117         ENTRY;
118
119         if (lmm != NULL) {
120                 if (lmm_bytes < sizeof(*lmm)) {
121                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
122                                exp->exp_obd->obd_name, lmm_bytes,
123                                (int)sizeof(*lmm));
124                         RETURN(-EINVAL);
125                 }
126                 /* XXX LOV_MAGIC etc check? */
127
128                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
129                         CERROR("%s: zero lmm_object_id: rc = %d\n",
130                                exp->exp_obd->obd_name, -EINVAL);
131                         RETURN(-EINVAL);
132                 }
133         }
134
135         lsm_size = lov_stripe_md_size(1);
136         if (lsmp == NULL)
137                 RETURN(lsm_size);
138
139         if (*lsmp != NULL && lmm == NULL) {
140                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 OBD_FREE(*lsmp, lsm_size);
142                 *lsmp = NULL;
143                 RETURN(0);
144         }
145
146         if (*lsmp == NULL) {
147                 OBD_ALLOC(*lsmp, lsm_size);
148                 if (unlikely(*lsmp == NULL))
149                         RETURN(-ENOMEM);
150                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
151                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
152                         OBD_FREE(*lsmp, lsm_size);
153                         RETURN(-ENOMEM);
154                 }
155                 loi_init((*lsmp)->lsm_oinfo[0]);
156         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
157                 RETURN(-EBADF);
158         }
159
160         if (lmm != NULL)
161                 /* XXX zero *lsmp? */
162                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
163
164         if (imp != NULL &&
165             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
166                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
167         else
168                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
169
170         RETURN(lsm_size);
171 }
172
173 static inline void osc_pack_capa(struct ptlrpc_request *req,
174                                  struct ost_body *body, void *capa)
175 {
176         struct obd_capa *oc = (struct obd_capa *)capa;
177         struct lustre_capa *c;
178
179         if (!capa)
180                 return;
181
182         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
183         LASSERT(c);
184         capa_cpy(c, oc);
185         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
186         DEBUG_CAPA(D_SEC, c, "pack");
187 }
188
189 static inline void osc_pack_req_body(struct ptlrpc_request *req,
190                                      struct obd_info *oinfo)
191 {
192         struct ost_body *body;
193
194         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
195         LASSERT(body);
196
197         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
198                              oinfo->oi_oa);
199         osc_pack_capa(req, body, oinfo->oi_capa);
200 }
201
202 static inline void osc_set_capa_size(struct ptlrpc_request *req,
203                                      const struct req_msg_field *field,
204                                      struct obd_capa *oc)
205 {
206         if (oc == NULL)
207                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
208         else
209                 /* it is already calculated as sizeof struct obd_capa */
210                 ;
211 }
212
213 static int osc_getattr_interpret(const struct lu_env *env,
214                                  struct ptlrpc_request *req,
215                                  struct osc_async_args *aa, int rc)
216 {
217         struct ost_body *body;
218         ENTRY;
219
220         if (rc != 0)
221                 GOTO(out, rc);
222
223         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
224         if (body) {
225                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
226                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
227                                      aa->aa_oi->oi_oa, &body->oa);
228
229                 /* This should really be sent by the OST */
230                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
231                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
232         } else {
233                 CDEBUG(D_INFO, "can't unpack ost_body\n");
234                 rc = -EPROTO;
235                 aa->aa_oi->oi_oa->o_valid = 0;
236         }
237 out:
238         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
239         RETURN(rc);
240 }
241
242 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
243                              struct ptlrpc_request_set *set)
244 {
245         struct ptlrpc_request *req;
246         struct osc_async_args *aa;
247         int                    rc;
248         ENTRY;
249
250         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
251         if (req == NULL)
252                 RETURN(-ENOMEM);
253
254         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
255         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
256         if (rc) {
257                 ptlrpc_request_free(req);
258                 RETURN(rc);
259         }
260
261         osc_pack_req_body(req, oinfo);
262
263         ptlrpc_request_set_replen(req);
264         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
265
266         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
267         aa = ptlrpc_req_async_args(req);
268         aa->aa_oi = oinfo;
269
270         ptlrpc_set_add_req(set, req);
271         RETURN(0);
272 }
273
274 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
275                        struct obd_info *oinfo)
276 {
277         struct ptlrpc_request *req;
278         struct ost_body       *body;
279         int                    rc;
280         ENTRY;
281
282         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
283         if (req == NULL)
284                 RETURN(-ENOMEM);
285
286         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
287         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
288         if (rc) {
289                 ptlrpc_request_free(req);
290                 RETURN(rc);
291         }
292
293         osc_pack_req_body(req, oinfo);
294
295         ptlrpc_request_set_replen(req);
296
297         rc = ptlrpc_queue_wait(req);
298         if (rc)
299                 GOTO(out, rc);
300
301         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
302         if (body == NULL)
303                 GOTO(out, rc = -EPROTO);
304
305         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
306         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
307                              &body->oa);
308
309         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
310         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
311
312         EXIT;
313  out:
314         ptlrpc_req_finished(req);
315         return rc;
316 }
317
318 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
319                        struct obd_info *oinfo, struct obd_trans_info *oti)
320 {
321         struct ptlrpc_request *req;
322         struct ost_body       *body;
323         int                    rc;
324         ENTRY;
325
326         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
327
328         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
329         if (req == NULL)
330                 RETURN(-ENOMEM);
331
332         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
333         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
334         if (rc) {
335                 ptlrpc_request_free(req);
336                 RETURN(rc);
337         }
338
339         osc_pack_req_body(req, oinfo);
340
341         ptlrpc_request_set_replen(req);
342
343         rc = ptlrpc_queue_wait(req);
344         if (rc)
345                 GOTO(out, rc);
346
347         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
348         if (body == NULL)
349                 GOTO(out, rc = -EPROTO);
350
351         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
352                              &body->oa);
353
354         EXIT;
355 out:
356         ptlrpc_req_finished(req);
357         RETURN(rc);
358 }
359
360 static int osc_setattr_interpret(const struct lu_env *env,
361                                  struct ptlrpc_request *req,
362                                  struct osc_setattr_args *sa, int rc)
363 {
364         struct ost_body *body;
365         ENTRY;
366
367         if (rc != 0)
368                 GOTO(out, rc);
369
370         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
371         if (body == NULL)
372                 GOTO(out, rc = -EPROTO);
373
374         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
375                              &body->oa);
376 out:
377         rc = sa->sa_upcall(sa->sa_cookie, rc);
378         RETURN(rc);
379 }
380
381 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
382                            struct obd_trans_info *oti,
383                            obd_enqueue_update_f upcall, void *cookie,
384                            struct ptlrpc_request_set *rqset)
385 {
386         struct ptlrpc_request   *req;
387         struct osc_setattr_args *sa;
388         int                      rc;
389         ENTRY;
390
391         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
392         if (req == NULL)
393                 RETURN(-ENOMEM);
394
395         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
396         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
397         if (rc) {
398                 ptlrpc_request_free(req);
399                 RETURN(rc);
400         }
401
402         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
403                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
404
405         osc_pack_req_body(req, oinfo);
406
407         ptlrpc_request_set_replen(req);
408
409         /* do mds to ost setattr asynchronously */
410         if (!rqset) {
411                 /* Do not wait for response. */
412                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413         } else {
414                 req->rq_interpret_reply =
415                         (ptlrpc_interpterer_t)osc_setattr_interpret;
416
417                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
418                 sa = ptlrpc_req_async_args(req);
419                 sa->sa_oa = oinfo->oi_oa;
420                 sa->sa_upcall = upcall;
421                 sa->sa_cookie = cookie;
422
423                 if (rqset == PTLRPCD_SET)
424                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
425                 else
426                         ptlrpc_set_add_req(rqset, req);
427         }
428
429         RETURN(0);
430 }
431
432 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
433                              struct obd_trans_info *oti,
434                              struct ptlrpc_request_set *rqset)
435 {
436         return osc_setattr_async_base(exp, oinfo, oti,
437                                       oinfo->oi_cb_up, oinfo, rqset);
438 }
439
440 int osc_real_create(struct obd_export *exp, struct obdo *oa,
441                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
442 {
443         struct ptlrpc_request *req;
444         struct ost_body       *body;
445         struct lov_stripe_md  *lsm;
446         int                    rc;
447         ENTRY;
448
449         LASSERT(oa);
450         LASSERT(ea);
451
452         lsm = *ea;
453         if (!lsm) {
454                 rc = obd_alloc_memmd(exp, &lsm);
455                 if (rc < 0)
456                         RETURN(rc);
457         }
458
459         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
460         if (req == NULL)
461                 GOTO(out, rc = -ENOMEM);
462
463         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
464         if (rc) {
465                 ptlrpc_request_free(req);
466                 GOTO(out, rc);
467         }
468
469         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
470         LASSERT(body);
471
472         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
473
474         ptlrpc_request_set_replen(req);
475
476         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
477             oa->o_flags == OBD_FL_DELORPHAN) {
478                 DEBUG_REQ(D_HA, req,
479                           "delorphan from OST integration");
480                 /* Don't resend the delorphan req */
481                 req->rq_no_resend = req->rq_no_delay = 1;
482         }
483
484         rc = ptlrpc_queue_wait(req);
485         if (rc)
486                 GOTO(out_req, rc);
487
488         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
489         if (body == NULL)
490                 GOTO(out_req, rc = -EPROTO);
491
492         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
493         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
494
495         oa->o_blksize = cli_brw_size(exp->exp_obd);
496         oa->o_valid |= OBD_MD_FLBLKSZ;
497
498         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
499          * have valid lsm_oinfo data structs, so don't go touching that.
500          * This needs to be fixed in a big way.
501          */
502         lsm->lsm_oi = oa->o_oi;
503         *ea = lsm;
504
505         if (oti != NULL) {
506                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
507                         if (oti->oti_logcookies == NULL)
508                                 oti->oti_logcookies = &oti->oti_onecookie;
509
510                         *oti->oti_logcookies = oa->o_lcookie;
511                 }
512         }
513
514         CDEBUG(D_HA, "transno: "LPD64"\n",
515                lustre_msg_get_transno(req->rq_repmsg));
516 out_req:
517         ptlrpc_req_finished(req);
518 out:
519         if (rc && !*ea)
520                 obd_free_memmd(exp, &lsm);
521         RETURN(rc);
522 }
523
524 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request   *req;
529         struct osc_setattr_args *sa;
530         struct ost_body         *body;
531         int                      rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
550                              oinfo->oi_oa);
551         osc_pack_capa(req, body, oinfo->oi_capa);
552
553         ptlrpc_request_set_replen(req);
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
556         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
557         sa = ptlrpc_req_async_args(req);
558         sa->sa_oa     = oinfo->oi_oa;
559         sa->sa_upcall = upcall;
560         sa->sa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_sync_interpret(const struct lu_env *env,
570                               struct ptlrpc_request *req,
571                               void *arg, int rc)
572 {
573         struct osc_fsync_args *fa = arg;
574         struct ost_body *body;
575         ENTRY;
576
577         if (rc)
578                 GOTO(out, rc);
579
580         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
581         if (body == NULL) {
582                 CERROR ("can't unpack ost_body\n");
583                 GOTO(out, rc = -EPROTO);
584         }
585
586         *fa->fa_oi->oi_oa = body->oa;
587 out:
588         rc = fa->fa_upcall(fa->fa_cookie, rc);
589         RETURN(rc);
590 }
591
592 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
593                   obd_enqueue_update_f upcall, void *cookie,
594                   struct ptlrpc_request_set *rqset)
595 {
596         struct ptlrpc_request *req;
597         struct ost_body       *body;
598         struct osc_fsync_args *fa;
599         int                    rc;
600         ENTRY;
601
602         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
603         if (req == NULL)
604                 RETURN(-ENOMEM);
605
606         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
607         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
608         if (rc) {
609                 ptlrpc_request_free(req);
610                 RETURN(rc);
611         }
612
613         /* overload the size and blocks fields in the oa with start/end */
614         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615         LASSERT(body);
616         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
617                              oinfo->oi_oa);
618         osc_pack_capa(req, body, oinfo->oi_capa);
619
620         ptlrpc_request_set_replen(req);
621         req->rq_interpret_reply = osc_sync_interpret;
622
623         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
624         fa = ptlrpc_req_async_args(req);
625         fa->fa_oi = oinfo;
626         fa->fa_upcall = upcall;
627         fa->fa_cookie = cookie;
628
629         if (rqset == PTLRPCD_SET)
630                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
631         else
632                 ptlrpc_set_add_req(rqset, req);
633
634         RETURN (0);
635 }
636
637 /* Find and cancel locally locks matched by @mode in the resource found by
638  * @objid. Found locks are added into @cancel list. Returns the amount of
639  * locks added to @cancels list. */
640 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
641                                    struct list_head *cancels,
642                                    ldlm_mode_t mode, __u64 lock_flags)
643 {
644         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
645         struct ldlm_res_id res_id;
646         struct ldlm_resource *res;
647         int count;
648         ENTRY;
649
650         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
651          * export) but disabled through procfs (flag in NS).
652          *
653          * This distinguishes from a case when ELC is not supported originally,
654          * when we still want to cancel locks in advance and just cancel them
655          * locally, without sending any RPC. */
656         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
657                 RETURN(0);
658
659         ostid_build_res_name(&oa->o_oi, &res_id);
660         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
661         if (IS_ERR(res))
662                 RETURN(0);
663
664         LDLM_RESOURCE_ADDREF(res);
665         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
666                                            lock_flags, 0, NULL);
667         LDLM_RESOURCE_DELREF(res);
668         ldlm_resource_putref(res);
669         RETURN(count);
670 }
671
672 static int osc_destroy_interpret(const struct lu_env *env,
673                                  struct ptlrpc_request *req, void *data,
674                                  int rc)
675 {
676         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
677
678         atomic_dec(&cli->cl_destroy_in_flight);
679         wake_up(&cli->cl_destroy_waitq);
680         return 0;
681 }
682
683 static int osc_can_send_destroy(struct client_obd *cli)
684 {
685         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
686             cli->cl_max_rpcs_in_flight) {
687                 /* The destroy request can be sent */
688                 return 1;
689         }
690         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
691             cli->cl_max_rpcs_in_flight) {
692                 /*
693                  * The counter has been modified between the two atomic
694                  * operations.
695                  */
696                 wake_up(&cli->cl_destroy_waitq);
697         }
698         return 0;
699 }
700
701 int osc_create(const struct lu_env *env, struct obd_export *exp,
702                struct obdo *oa, struct lov_stripe_md **ea,
703                struct obd_trans_info *oti)
704 {
705         int rc = 0;
706         ENTRY;
707
708         LASSERT(oa);
709         LASSERT(ea);
710         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
711
712         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
713             oa->o_flags == OBD_FL_RECREATE_OBJS) {
714                 RETURN(osc_real_create(exp, oa, ea, oti));
715         }
716
717         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
718                 RETURN(osc_real_create(exp, oa, ea, oti));
719
720         /* we should not get here anymore */
721         LBUG();
722
723         RETURN(rc);
724 }
725
726 /* Destroy requests can be async always on the client, and we don't even really
727  * care about the return code since the client cannot do anything at all about
728  * a destroy failure.
729  * When the MDS is unlinking a filename, it saves the file objects into a
730  * recovery llog, and these object records are cancelled when the OST reports
731  * they were destroyed and sync'd to disk (i.e. transaction committed).
732  * If the client dies, or the OST is down when the object should be destroyed,
733  * the records are not cancelled, and when the OST reconnects to the MDS next,
734  * it will retrieve the llog unlink logs and then sends the log cancellation
735  * cookies to the MDS after committing destroy transactions. */
736 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
737                        struct obdo *oa, struct lov_stripe_md *ea,
738                        struct obd_trans_info *oti, struct obd_export *md_export,
739                        void *capa)
740 {
741         struct client_obd     *cli = &exp->exp_obd->u.cli;
742         struct ptlrpc_request *req;
743         struct ost_body       *body;
744         struct list_head       cancels = LIST_HEAD_INIT(cancels);
745         int rc, count;
746         ENTRY;
747
748         if (!oa) {
749                 CDEBUG(D_INFO, "oa NULL\n");
750                 RETURN(-EINVAL);
751         }
752
753         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
754                                         LDLM_FL_DISCARD_DATA);
755
756         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
757         if (req == NULL) {
758                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
759                 RETURN(-ENOMEM);
760         }
761
762         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
763         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
764                                0, &cancels, count);
765         if (rc) {
766                 ptlrpc_request_free(req);
767                 RETURN(rc);
768         }
769
770         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
771         ptlrpc_at_set_req_timeout(req);
772
773         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
774                 oa->o_lcookie = *oti->oti_logcookies;
775         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
776         LASSERT(body);
777         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
778
779         osc_pack_capa(req, body, (struct obd_capa *)capa);
780         ptlrpc_request_set_replen(req);
781
782         /* If osc_destory is for destroying the unlink orphan,
783          * sent from MDT to OST, which should not be blocked here,
784          * because the process might be triggered by ptlrpcd, and
785          * it is not good to block ptlrpcd thread (b=16006)*/
786         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
787                 req->rq_interpret_reply = osc_destroy_interpret;
788                 if (!osc_can_send_destroy(cli)) {
789                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
790                                                           NULL);
791
792                         /*
793                          * Wait until the number of on-going destroy RPCs drops
794                          * under max_rpc_in_flight
795                          */
796                         l_wait_event_exclusive(cli->cl_destroy_waitq,
797                                                osc_can_send_destroy(cli), &lwi);
798                 }
799         }
800
801         /* Do not wait for response */
802         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
803         RETURN(0);
804 }
805
806 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
807                                 long writing_bytes)
808 {
809         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
810
811         LASSERT(!(oa->o_valid & bits));
812
813         oa->o_valid |= bits;
814         client_obd_list_lock(&cli->cl_loi_list_lock);
815         oa->o_dirty = cli->cl_dirty;
816         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
817                      cli->cl_dirty_max)) {
818                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
819                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
820                 oa->o_undirty = 0;
821         } else if (unlikely(atomic_read(&obd_dirty_pages) -
822                             atomic_read(&obd_dirty_transit_pages) >
823                             (long)(obd_max_dirty_pages + 1))) {
824                 /* The atomic_read() allowing the atomic_inc() are
825                  * not covered by a lock thus they may safely race and trip
826                  * this CERROR() unless we add in a small fudge factor (+1). */
827                 CERROR("%s: dirty %d - %d > system dirty_max %d\n",
828                        cli->cl_import->imp_obd->obd_name,
829                        atomic_read(&obd_dirty_pages),
830                        atomic_read(&obd_dirty_transit_pages),
831                        obd_max_dirty_pages);
832                 oa->o_undirty = 0;
833         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
834                 CERROR("dirty %lu - dirty_max %lu too big???\n",
835                        cli->cl_dirty, cli->cl_dirty_max);
836                 oa->o_undirty = 0;
837         } else {
838                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
839                                       PAGE_CACHE_SHIFT) *
840                                      (cli->cl_max_rpcs_in_flight + 1);
841                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
842         }
843         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
844         oa->o_dropped = cli->cl_lost_grant;
845         cli->cl_lost_grant = 0;
846         client_obd_list_unlock(&cli->cl_loi_list_lock);
847         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
848                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
849
850 }
851
852 void osc_update_next_shrink(struct client_obd *cli)
853 {
854         cli->cl_next_shrink_grant =
855                 cfs_time_shift(cli->cl_grant_shrink_interval);
856         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
857                cli->cl_next_shrink_grant);
858 }
859
860 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
861 {
862         client_obd_list_lock(&cli->cl_loi_list_lock);
863         cli->cl_avail_grant += grant;
864         client_obd_list_unlock(&cli->cl_loi_list_lock);
865 }
866
867 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
868 {
869         if (body->oa.o_valid & OBD_MD_FLGRANT) {
870                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
871                 __osc_update_grant(cli, body->oa.o_grant);
872         }
873 }
874
875 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
876                               obd_count keylen, void *key, obd_count vallen,
877                               void *val, struct ptlrpc_request_set *set);
878
879 static int osc_shrink_grant_interpret(const struct lu_env *env,
880                                       struct ptlrpc_request *req,
881                                       void *aa, int rc)
882 {
883         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
884         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
885         struct ost_body *body;
886
887         if (rc != 0) {
888                 __osc_update_grant(cli, oa->o_grant);
889                 GOTO(out, rc);
890         }
891
892         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
893         LASSERT(body);
894         osc_update_grant(cli, body);
895 out:
896         OBDO_FREE(oa);
897         return rc;
898 }
899
900 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
901 {
902         client_obd_list_lock(&cli->cl_loi_list_lock);
903         oa->o_grant = cli->cl_avail_grant / 4;
904         cli->cl_avail_grant -= oa->o_grant;
905         client_obd_list_unlock(&cli->cl_loi_list_lock);
906         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
907                 oa->o_valid |= OBD_MD_FLFLAGS;
908                 oa->o_flags = 0;
909         }
910         oa->o_flags |= OBD_FL_SHRINK_GRANT;
911         osc_update_next_shrink(cli);
912 }
913
914 /* Shrink the current grant, either from some large amount to enough for a
915  * full set of in-flight RPCs, or if we have already shrunk to that limit
916  * then to enough for a single RPC.  This avoids keeping more grant than
917  * needed, and avoids shrinking the grant piecemeal. */
918 static int osc_shrink_grant(struct client_obd *cli)
919 {
920         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
921                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
922
923         client_obd_list_lock(&cli->cl_loi_list_lock);
924         if (cli->cl_avail_grant <= target_bytes)
925                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
926         client_obd_list_unlock(&cli->cl_loi_list_lock);
927
928         return osc_shrink_grant_to_target(cli, target_bytes);
929 }
930
931 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
932 {
933         int                     rc = 0;
934         struct ost_body        *body;
935         ENTRY;
936
937         client_obd_list_lock(&cli->cl_loi_list_lock);
938         /* Don't shrink if we are already above or below the desired limit
939          * We don't want to shrink below a single RPC, as that will negatively
940          * impact block allocation and long-term performance. */
941         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
942                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
943
944         if (target_bytes >= cli->cl_avail_grant) {
945                 client_obd_list_unlock(&cli->cl_loi_list_lock);
946                 RETURN(0);
947         }
948         client_obd_list_unlock(&cli->cl_loi_list_lock);
949
950         OBD_ALLOC_PTR(body);
951         if (!body)
952                 RETURN(-ENOMEM);
953
954         osc_announce_cached(cli, &body->oa, 0);
955
956         client_obd_list_lock(&cli->cl_loi_list_lock);
957         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
958         cli->cl_avail_grant = target_bytes;
959         client_obd_list_unlock(&cli->cl_loi_list_lock);
960         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
961                 body->oa.o_valid |= OBD_MD_FLFLAGS;
962                 body->oa.o_flags = 0;
963         }
964         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
965         osc_update_next_shrink(cli);
966
967         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
968                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
969                                 sizeof(*body), body, NULL);
970         if (rc != 0)
971                 __osc_update_grant(cli, body->oa.o_grant);
972         OBD_FREE_PTR(body);
973         RETURN(rc);
974 }
975
976 static int osc_should_shrink_grant(struct client_obd *client)
977 {
978         cfs_time_t time = cfs_time_current();
979         cfs_time_t next_shrink = client->cl_next_shrink_grant;
980
981         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
982              OBD_CONNECT_GRANT_SHRINK) == 0)
983                 return 0;
984
985         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
986                 /* Get the current RPC size directly, instead of going via:
987                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
988                  * Keep comment here so that it can be found by searching. */
989                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
990
991                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
992                     client->cl_avail_grant > brw_size)
993                         return 1;
994                 else
995                         osc_update_next_shrink(client);
996         }
997         return 0;
998 }
999
1000 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1001 {
1002         struct client_obd *client;
1003
1004         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1005                 if (osc_should_shrink_grant(client))
1006                         osc_shrink_grant(client);
1007         }
1008         return 0;
1009 }
1010
1011 static int osc_add_shrink_grant(struct client_obd *client)
1012 {
1013         int rc;
1014
1015         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1016                                        TIMEOUT_GRANT,
1017                                        osc_grant_shrink_grant_cb, NULL,
1018                                        &client->cl_grant_shrink_list);
1019         if (rc) {
1020                 CERROR("add grant client %s error %d\n",
1021                         client->cl_import->imp_obd->obd_name, rc);
1022                 return rc;
1023         }
1024         CDEBUG(D_CACHE, "add grant client %s \n",
1025                client->cl_import->imp_obd->obd_name);
1026         osc_update_next_shrink(client);
1027         return 0;
1028 }
1029
1030 static int osc_del_shrink_grant(struct client_obd *client)
1031 {
1032         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1033                                          TIMEOUT_GRANT);
1034 }
1035
1036 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1037 {
1038         /*
1039          * ocd_grant is the total grant amount we're expect to hold: if we've
1040          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1041          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1042          *
1043          * race is tolerable here: if we're evicted, but imp_state already
1044          * left EVICTED state, then cl_dirty must be 0 already.
1045          */
1046         client_obd_list_lock(&cli->cl_loi_list_lock);
1047         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1048                 cli->cl_avail_grant = ocd->ocd_grant;
1049         else
1050                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1051
1052         if (cli->cl_avail_grant < 0) {
1053                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1054                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1055                       ocd->ocd_grant, cli->cl_dirty);
1056                 /* workaround for servers which do not have the patch from
1057                  * LU-2679 */
1058                 cli->cl_avail_grant = ocd->ocd_grant;
1059         }
1060
1061         /* determine the appropriate chunk size used by osc_extent. */
1062         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1063         client_obd_list_unlock(&cli->cl_loi_list_lock);
1064
1065         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1066                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1067                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1068
1069         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1070             list_empty(&cli->cl_grant_shrink_list))
1071                 osc_add_shrink_grant(cli);
1072 }
1073
1074 /* We assume that the reason this OSC got a short read is because it read
1075  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1076  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1077  * this stripe never got written at or beyond this stripe offset yet. */
1078 static void handle_short_read(int nob_read, obd_count page_count,
1079                               struct brw_page **pga)
1080 {
1081         char *ptr;
1082         int i = 0;
1083
1084         /* skip bytes read OK */
1085         while (nob_read > 0) {
1086                 LASSERT (page_count > 0);
1087
1088                 if (pga[i]->count > nob_read) {
1089                         /* EOF inside this page */
1090                         ptr = kmap(pga[i]->pg) +
1091                                 (pga[i]->off & ~CFS_PAGE_MASK);
1092                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1093                         kunmap(pga[i]->pg);
1094                         page_count--;
1095                         i++;
1096                         break;
1097                 }
1098
1099                 nob_read -= pga[i]->count;
1100                 page_count--;
1101                 i++;
1102         }
1103
1104         /* zero remaining pages */
1105         while (page_count-- > 0) {
1106                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1107                 memset(ptr, 0, pga[i]->count);
1108                 kunmap(pga[i]->pg);
1109                 i++;
1110         }
1111 }
1112
1113 static int check_write_rcs(struct ptlrpc_request *req,
1114                            int requested_nob, int niocount,
1115                            obd_count page_count, struct brw_page **pga)
1116 {
1117         int     i;
1118         __u32   *remote_rcs;
1119
1120         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1121                                                   sizeof(*remote_rcs) *
1122                                                   niocount);
1123         if (remote_rcs == NULL) {
1124                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1125                 return(-EPROTO);
1126         }
1127
1128         /* return error if any niobuf was in error */
1129         for (i = 0; i < niocount; i++) {
1130                 if ((int)remote_rcs[i] < 0)
1131                         return(remote_rcs[i]);
1132
1133                 if (remote_rcs[i] != 0) {
1134                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1135                                 i, remote_rcs[i], req);
1136                         return(-EPROTO);
1137                 }
1138         }
1139
1140         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1141                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1142                        req->rq_bulk->bd_nob_transferred, requested_nob);
1143                 return(-EPROTO);
1144         }
1145
1146         return (0);
1147 }
1148
1149 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1150 {
1151         if (p1->flag != p2->flag) {
1152                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1153                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1154                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1155
1156                 /* warn if we try to combine flags that we don't know to be
1157                  * safe to combine */
1158                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1159                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1160                               "report this at http://bugs.whamcloud.com/\n",
1161                               p1->flag, p2->flag);
1162                 }
1163                 return 0;
1164         }
1165
1166         return (p1->off + p1->count == p2->off);
1167 }
1168
1169 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1170                                    struct brw_page **pga, int opc,
1171                                    cksum_type_t cksum_type)
1172 {
1173         __u32                           cksum;
1174         int                             i = 0;
1175         struct cfs_crypto_hash_desc     *hdesc;
1176         unsigned int                    bufsize;
1177         int                             err;
1178         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1179
1180         LASSERT(pg_count > 0);
1181
1182         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1183         if (IS_ERR(hdesc)) {
1184                 CERROR("Unable to initialize checksum hash %s\n",
1185                        cfs_crypto_hash_name(cfs_alg));
1186                 return PTR_ERR(hdesc);
1187         }
1188
1189         while (nob > 0 && pg_count > 0) {
1190                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1191
1192                 /* corrupt the data before we compute the checksum, to
1193                  * simulate an OST->client data error */
1194                 if (i == 0 && opc == OST_READ &&
1195                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1196                         unsigned char *ptr = kmap(pga[i]->pg);
1197                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1198
1199                         memcpy(ptr + off, "bad1", min(4, nob));
1200                         kunmap(pga[i]->pg);
1201                 }
1202                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1203                                             pga[i]->off & ~CFS_PAGE_MASK,
1204                                             count);
1205                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1206                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1207
1208                 nob -= pga[i]->count;
1209                 pg_count--;
1210                 i++;
1211         }
1212
1213         bufsize = sizeof(cksum);
1214         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1215
1216         /* For sending we only compute the wrong checksum instead
1217          * of corrupting the data so it is still correct on a redo */
1218         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1219                 cksum++;
1220
1221         return cksum;
1222 }
1223
1224 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1225                                 struct lov_stripe_md *lsm, obd_count page_count,
1226                                 struct brw_page **pga,
1227                                 struct ptlrpc_request **reqp,
1228                                 struct obd_capa *ocapa, int reserve,
1229                                 int resend)
1230 {
1231         struct ptlrpc_request   *req;
1232         struct ptlrpc_bulk_desc *desc;
1233         struct ost_body         *body;
1234         struct obd_ioobj        *ioobj;
1235         struct niobuf_remote    *niobuf;
1236         int niocount, i, requested_nob, opc, rc;
1237         struct osc_brw_async_args *aa;
1238         struct req_capsule      *pill;
1239         struct brw_page *pg_prev;
1240
1241         ENTRY;
1242         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1243                 RETURN(-ENOMEM); /* Recoverable */
1244         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1245                 RETURN(-EINVAL); /* Fatal */
1246
1247         if ((cmd & OBD_BRW_WRITE) != 0) {
1248                 opc = OST_WRITE;
1249                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1250                                                 cli->cl_import->imp_rq_pool,
1251                                                 &RQF_OST_BRW_WRITE);
1252         } else {
1253                 opc = OST_READ;
1254                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1255         }
1256         if (req == NULL)
1257                 RETURN(-ENOMEM);
1258
1259         for (niocount = i = 1; i < page_count; i++) {
1260                 if (!can_merge_pages(pga[i - 1], pga[i]))
1261                         niocount++;
1262         }
1263
1264         pill = &req->rq_pill;
1265         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1266                              sizeof(*ioobj));
1267         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1268                              niocount * sizeof(*niobuf));
1269         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1270
1271         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1272         if (rc) {
1273                 ptlrpc_request_free(req);
1274                 RETURN(rc);
1275         }
1276         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1277         ptlrpc_at_set_req_timeout(req);
1278         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1279          * retry logic */
1280         req->rq_no_retry_einprogress = 1;
1281
1282         desc = ptlrpc_prep_bulk_imp(req, page_count,
1283                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1284                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1285                 OST_BULK_PORTAL);
1286
1287         if (desc == NULL)
1288                 GOTO(out, rc = -ENOMEM);
1289         /* NB request now owns desc and will free it when it gets freed */
1290
1291         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1292         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1293         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1294         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1295
1296         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1297
1298         obdo_to_ioobj(oa, ioobj);
1299         ioobj->ioo_bufcnt = niocount;
1300         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1301          * that might be send for this request.  The actual number is decided
1302          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1303          * "max - 1" for old client compatibility sending "0", and also so the
1304          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1305         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1306         osc_pack_capa(req, body, ocapa);
1307         LASSERT(page_count > 0);
1308         pg_prev = pga[0];
1309         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1310                 struct brw_page *pg = pga[i];
1311                 int poff = pg->off & ~CFS_PAGE_MASK;
1312
1313                 LASSERT(pg->count > 0);
1314                 /* make sure there is no gap in the middle of page array */
1315                 LASSERTF(page_count == 1 ||
1316                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1317                           ergo(i > 0 && i < page_count - 1,
1318                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1319                           ergo(i == page_count - 1, poff == 0)),
1320                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1321                          i, page_count, pg, pg->off, pg->count);
1322 #ifdef __linux__
1323                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1324                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1325                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1326                          i, page_count,
1327                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1328                          pg_prev->pg, page_private(pg_prev->pg),
1329                          pg_prev->pg->index, pg_prev->off);
1330 #else
1331                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1332                          "i %d p_c %u\n", i, page_count);
1333 #endif
1334                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1335                         (pg->flag & OBD_BRW_SRVLOCK));
1336
1337                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1338                 requested_nob += pg->count;
1339
1340                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1341                         niobuf--;
1342                         niobuf->len += pg->count;
1343                 } else {
1344                         niobuf->offset = pg->off;
1345                         niobuf->len    = pg->count;
1346                         niobuf->flags  = pg->flag;
1347                 }
1348                 pg_prev = pg;
1349         }
1350
1351         LASSERTF((void *)(niobuf - niocount) ==
1352                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1353                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1354                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1355
1356         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1357         if (resend) {
1358                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1359                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1360                         body->oa.o_flags = 0;
1361                 }
1362                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1363         }
1364
1365         if (osc_should_shrink_grant(cli))
1366                 osc_shrink_grant_local(cli, &body->oa);
1367
1368         /* size[REQ_REC_OFF] still sizeof (*body) */
1369         if (opc == OST_WRITE) {
1370                 if (cli->cl_checksum &&
1371                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1372                         /* store cl_cksum_type in a local variable since
1373                          * it can be changed via lprocfs */
1374                         cksum_type_t cksum_type = cli->cl_cksum_type;
1375
1376                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1377                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1378                                 body->oa.o_flags = 0;
1379                         }
1380                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1381                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1382                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1383                                                              page_count, pga,
1384                                                              OST_WRITE,
1385                                                              cksum_type);
1386                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1387                                body->oa.o_cksum);
1388                         /* save this in 'oa', too, for later checking */
1389                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1390                         oa->o_flags |= cksum_type_pack(cksum_type);
1391                 } else {
1392                         /* clear out the checksum flag, in case this is a
1393                          * resend but cl_checksum is no longer set. b=11238 */
1394                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1395                 }
1396                 oa->o_cksum = body->oa.o_cksum;
1397                 /* 1 RC per niobuf */
1398                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1399                                      sizeof(__u32) * niocount);
1400         } else {
1401                 if (cli->cl_checksum &&
1402                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1403                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1404                                 body->oa.o_flags = 0;
1405                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1406                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1407                 }
1408         }
1409         ptlrpc_request_set_replen(req);
1410
1411         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1412         aa = ptlrpc_req_async_args(req);
1413         aa->aa_oa = oa;
1414         aa->aa_requested_nob = requested_nob;
1415         aa->aa_nio_count = niocount;
1416         aa->aa_page_count = page_count;
1417         aa->aa_resends = 0;
1418         aa->aa_ppga = pga;
1419         aa->aa_cli = cli;
1420         INIT_LIST_HEAD(&aa->aa_oaps);
1421         if (ocapa && reserve)
1422                 aa->aa_ocapa = capa_get(ocapa);
1423
1424         *reqp = req;
1425         RETURN(0);
1426
1427  out:
1428         ptlrpc_req_finished(req);
1429         RETURN(rc);
1430 }
1431
1432 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1433                                 __u32 client_cksum, __u32 server_cksum, int nob,
1434                                 obd_count page_count, struct brw_page **pga,
1435                                 cksum_type_t client_cksum_type)
1436 {
1437         __u32 new_cksum;
1438         char *msg;
1439         cksum_type_t cksum_type;
1440
1441         if (server_cksum == client_cksum) {
1442                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1443                 return 0;
1444         }
1445
1446         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1447                                        oa->o_flags : 0);
1448         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1449                                       cksum_type);
1450
1451         if (cksum_type != client_cksum_type)
1452                 msg = "the server did not use the checksum type specified in "
1453                       "the original request - likely a protocol problem";
1454         else if (new_cksum == server_cksum)
1455                 msg = "changed on the client after we checksummed it - "
1456                       "likely false positive due to mmap IO (bug 11742)";
1457         else if (new_cksum == client_cksum)
1458                 msg = "changed in transit before arrival at OST";
1459         else
1460                 msg = "changed in transit AND doesn't match the original - "
1461                       "likely false positive due to mmap IO (bug 11742)";
1462
1463         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1464                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1465                            msg, libcfs_nid2str(peer->nid),
1466                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1467                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1468                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1469                            POSTID(&oa->o_oi), pga[0]->off,
1470                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1471         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1472                "client csum now %x\n", client_cksum, client_cksum_type,
1473                server_cksum, cksum_type, new_cksum);
1474         return 1;
1475 }
1476
1477 /* Note rc enters this function as number of bytes transferred */
1478 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1479 {
1480         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1481         const lnet_process_id_t *peer =
1482                         &req->rq_import->imp_connection->c_peer;
1483         struct client_obd *cli = aa->aa_cli;
1484         struct ost_body *body;
1485         __u32 client_cksum = 0;
1486         ENTRY;
1487
1488         if (rc < 0 && rc != -EDQUOT) {
1489                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1490                 RETURN(rc);
1491         }
1492
1493         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1494         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1495         if (body == NULL) {
1496                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1497                 RETURN(-EPROTO);
1498         }
1499
1500         /* set/clear over quota flag for a uid/gid */
1501         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1502             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1503                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1504
1505                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1506                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1507                        body->oa.o_flags);
1508                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1509         }
1510
1511         osc_update_grant(cli, body);
1512
1513         if (rc < 0)
1514                 RETURN(rc);
1515
1516         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1517                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1518
1519         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1520                 if (rc > 0) {
1521                         CERROR("Unexpected +ve rc %d\n", rc);
1522                         RETURN(-EPROTO);
1523                 }
1524                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1525
1526                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1527                         RETURN(-EAGAIN);
1528
1529                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1530                     check_write_checksum(&body->oa, peer, client_cksum,
1531                                          body->oa.o_cksum, aa->aa_requested_nob,
1532                                          aa->aa_page_count, aa->aa_ppga,
1533                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1534                         RETURN(-EAGAIN);
1535
1536                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1537                                      aa->aa_page_count, aa->aa_ppga);
1538                 GOTO(out, rc);
1539         }
1540
1541         /* The rest of this function executes only for OST_READs */
1542
1543         /* if unwrap_bulk failed, return -EAGAIN to retry */
1544         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1545         if (rc < 0)
1546                 GOTO(out, rc = -EAGAIN);
1547
1548         if (rc > aa->aa_requested_nob) {
1549                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1550                        aa->aa_requested_nob);
1551                 RETURN(-EPROTO);
1552         }
1553
1554         if (rc != req->rq_bulk->bd_nob_transferred) {
1555                 CERROR ("Unexpected rc %d (%d transferred)\n",
1556                         rc, req->rq_bulk->bd_nob_transferred);
1557                 return (-EPROTO);
1558         }
1559
1560         if (rc < aa->aa_requested_nob)
1561                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1562
1563         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1564                 static int cksum_counter;
1565                 __u32      server_cksum = body->oa.o_cksum;
1566                 char      *via;
1567                 char      *router;
1568                 cksum_type_t cksum_type;
1569
1570                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1571                                                body->oa.o_flags : 0);
1572                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1573                                                  aa->aa_ppga, OST_READ,
1574                                                  cksum_type);
1575
1576                 if (peer->nid == req->rq_bulk->bd_sender) {
1577                         via = router = "";
1578                 } else {
1579                         via = " via ";
1580                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1581                 }
1582
1583                 if (server_cksum != client_cksum) {
1584                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1585                                            "%s%s%s inode "DFID" object "DOSTID
1586                                            " extent ["LPU64"-"LPU64"]\n",
1587                                            req->rq_import->imp_obd->obd_name,
1588                                            libcfs_nid2str(peer->nid),
1589                                            via, router,
1590                                            body->oa.o_valid & OBD_MD_FLFID ?
1591                                                 body->oa.o_parent_seq : (__u64)0,
1592                                            body->oa.o_valid & OBD_MD_FLFID ?
1593                                                 body->oa.o_parent_oid : 0,
1594                                            body->oa.o_valid & OBD_MD_FLFID ?
1595                                                 body->oa.o_parent_ver : 0,
1596                                            POSTID(&body->oa.o_oi),
1597                                            aa->aa_ppga[0]->off,
1598                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1599                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1600                                                                         1);
1601                         CERROR("client %x, server %x, cksum_type %x\n",
1602                                client_cksum, server_cksum, cksum_type);
1603                         cksum_counter = 0;
1604                         aa->aa_oa->o_cksum = client_cksum;
1605                         rc = -EAGAIN;
1606                 } else {
1607                         cksum_counter++;
1608                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1609                         rc = 0;
1610                 }
1611         } else if (unlikely(client_cksum)) {
1612                 static int cksum_missed;
1613
1614                 cksum_missed++;
1615                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1616                         CERROR("Checksum %u requested from %s but not sent\n",
1617                                cksum_missed, libcfs_nid2str(peer->nid));
1618         } else {
1619                 rc = 0;
1620         }
1621 out:
1622         if (rc >= 0)
1623                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1624                                      aa->aa_oa, &body->oa);
1625
1626         RETURN(rc);
1627 }
1628
1629 static int osc_brw_redo_request(struct ptlrpc_request *request,
1630                                 struct osc_brw_async_args *aa, int rc)
1631 {
1632         struct ptlrpc_request *new_req;
1633         struct osc_brw_async_args *new_aa;
1634         struct osc_async_page *oap;
1635         ENTRY;
1636
1637         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1638                   "redo for recoverable error %d", rc);
1639
1640         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1641                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1642                                   aa->aa_cli, aa->aa_oa,
1643                                   NULL /* lsm unused by osc currently */,
1644                                   aa->aa_page_count, aa->aa_ppga,
1645                                   &new_req, aa->aa_ocapa, 0, 1);
1646         if (rc)
1647                 RETURN(rc);
1648
1649         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1650                 if (oap->oap_request != NULL) {
1651                         LASSERTF(request == oap->oap_request,
1652                                  "request %p != oap_request %p\n",
1653                                  request, oap->oap_request);
1654                         if (oap->oap_interrupted) {
1655                                 ptlrpc_req_finished(new_req);
1656                                 RETURN(-EINTR);
1657                         }
1658                 }
1659         }
1660         /* New request takes over pga and oaps from old request.
1661          * Note that copying a list_head doesn't work, need to move it... */
1662         aa->aa_resends++;
1663         new_req->rq_interpret_reply = request->rq_interpret_reply;
1664         new_req->rq_async_args = request->rq_async_args;
1665         new_req->rq_commit_cb = request->rq_commit_cb;
1666         /* cap resend delay to the current request timeout, this is similar to
1667          * what ptlrpc does (see after_reply()) */
1668         if (aa->aa_resends > new_req->rq_timeout)
1669                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1670         else
1671                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1672         new_req->rq_generation_set = 1;
1673         new_req->rq_import_generation = request->rq_import_generation;
1674
1675         new_aa = ptlrpc_req_async_args(new_req);
1676
1677         INIT_LIST_HEAD(&new_aa->aa_oaps);
1678         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1679         INIT_LIST_HEAD(&new_aa->aa_exts);
1680         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1681         new_aa->aa_resends = aa->aa_resends;
1682
1683         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1684                 if (oap->oap_request) {
1685                         ptlrpc_req_finished(oap->oap_request);
1686                         oap->oap_request = ptlrpc_request_addref(new_req);
1687                 }
1688         }
1689
1690         new_aa->aa_ocapa = aa->aa_ocapa;
1691         aa->aa_ocapa = NULL;
1692
1693         /* XXX: This code will run into problem if we're going to support
1694          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1695          * and wait for all of them to be finished. We should inherit request
1696          * set from old request. */
1697         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1698
1699         DEBUG_REQ(D_INFO, new_req, "new request");
1700         RETURN(0);
1701 }
1702
1703 /*
1704  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1705  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1706  * fine for our small page arrays and doesn't require allocation.  its an
1707  * insertion sort that swaps elements that are strides apart, shrinking the
1708  * stride down until its '1' and the array is sorted.
1709  */
1710 static void sort_brw_pages(struct brw_page **array, int num)
1711 {
1712         int stride, i, j;
1713         struct brw_page *tmp;
1714
1715         if (num == 1)
1716                 return;
1717         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1718                 ;
1719
1720         do {
1721                 stride /= 3;
1722                 for (i = stride ; i < num ; i++) {
1723                         tmp = array[i];
1724                         j = i;
1725                         while (j >= stride && array[j - stride]->off > tmp->off) {
1726                                 array[j] = array[j - stride];
1727                                 j -= stride;
1728                         }
1729                         array[j] = tmp;
1730                 }
1731         } while (stride > 1);
1732 }
1733
1734 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1735 {
1736         LASSERT(ppga != NULL);
1737         OBD_FREE(ppga, sizeof(*ppga) * count);
1738 }
1739
1740 static int brw_interpret(const struct lu_env *env,
1741                          struct ptlrpc_request *req, void *data, int rc)
1742 {
1743         struct osc_brw_async_args *aa = data;
1744         struct osc_extent *ext;
1745         struct osc_extent *tmp;
1746         struct client_obd *cli = aa->aa_cli;
1747         ENTRY;
1748
1749         rc = osc_brw_fini_request(req, rc);
1750         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1751         /* When server return -EINPROGRESS, client should always retry
1752          * regardless of the number of times the bulk was resent already. */
1753         if (osc_recoverable_error(rc)) {
1754                 if (req->rq_import_generation !=
1755                     req->rq_import->imp_generation) {
1756                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1757                                ""DOSTID", rc = %d.\n",
1758                                req->rq_import->imp_obd->obd_name,
1759                                POSTID(&aa->aa_oa->o_oi), rc);
1760                 } else if (rc == -EINPROGRESS ||
1761                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1762                         rc = osc_brw_redo_request(req, aa, rc);
1763                 } else {
1764                         CERROR("%s: too many resent retries for object: "
1765                                ""LPU64":"LPU64", rc = %d.\n",
1766                                req->rq_import->imp_obd->obd_name,
1767                                POSTID(&aa->aa_oa->o_oi), rc);
1768                 }
1769
1770                 if (rc == 0)
1771                         RETURN(0);
1772                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1773                         rc = -EIO;
1774         }
1775
1776         if (aa->aa_ocapa) {
1777                 capa_put(aa->aa_ocapa);
1778                 aa->aa_ocapa = NULL;
1779         }
1780
1781         if (rc == 0) {
1782                 struct obdo *oa = aa->aa_oa;
1783                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1784                 unsigned long valid = 0;
1785                 struct cl_object *obj;
1786                 struct osc_async_page *last;
1787
1788                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1789                 obj = osc2cl(last->oap_obj);
1790
1791                 cl_object_attr_lock(obj);
1792                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1793                         attr->cat_blocks = oa->o_blocks;
1794                         valid |= CAT_BLOCKS;
1795                 }
1796                 if (oa->o_valid & OBD_MD_FLMTIME) {
1797                         attr->cat_mtime = oa->o_mtime;
1798                         valid |= CAT_MTIME;
1799                 }
1800                 if (oa->o_valid & OBD_MD_FLATIME) {
1801                         attr->cat_atime = oa->o_atime;
1802                         valid |= CAT_ATIME;
1803                 }
1804                 if (oa->o_valid & OBD_MD_FLCTIME) {
1805                         attr->cat_ctime = oa->o_ctime;
1806                         valid |= CAT_CTIME;
1807                 }
1808
1809                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1810                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1811                         loff_t last_off = last->oap_count + last->oap_obj_off;
1812
1813                         /* Change file size if this is an out of quota or
1814                          * direct IO write and it extends the file size */
1815                         if (loi->loi_lvb.lvb_size < last_off) {
1816                                 attr->cat_size = last_off;
1817                                 valid |= CAT_SIZE;
1818                         }
1819                         /* Extend KMS if it's not a lockless write */
1820                         if (loi->loi_kms < last_off &&
1821                             oap2osc_page(last)->ops_srvlock == 0) {
1822                                 attr->cat_kms = last_off;
1823                                 valid |= CAT_KMS;
1824                         }
1825                 }
1826
1827                 if (valid != 0)
1828                         cl_object_attr_set(env, obj, attr, valid);
1829                 cl_object_attr_unlock(obj);
1830         }
1831         OBDO_FREE(aa->aa_oa);
1832
1833         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1834                 osc_inc_unstable_pages(req);
1835
1836         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1837                 list_del_init(&ext->oe_link);
1838                 osc_extent_finish(env, ext, 1, rc);
1839         }
1840         LASSERT(list_empty(&aa->aa_exts));
1841         LASSERT(list_empty(&aa->aa_oaps));
1842
1843         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1844                           req->rq_bulk->bd_nob_transferred);
1845         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1846         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1847
1848         client_obd_list_lock(&cli->cl_loi_list_lock);
1849         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1850          * is called so we know whether to go to sync BRWs or wait for more
1851          * RPCs to complete */
1852         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1853                 cli->cl_w_in_flight--;
1854         else
1855                 cli->cl_r_in_flight--;
1856         osc_wake_cache_waiters(cli);
1857         client_obd_list_unlock(&cli->cl_loi_list_lock);
1858
1859         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1860         RETURN(rc);
1861 }
1862
1863 static void brw_commit(struct ptlrpc_request *req)
1864 {
1865         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1866          * this called via the rq_commit_cb, I need to ensure
1867          * osc_dec_unstable_pages is still called. Otherwise unstable
1868          * pages may be leaked. */
1869         spin_lock(&req->rq_lock);
1870         if (likely(req->rq_unstable)) {
1871                 req->rq_unstable = 0;
1872                 spin_unlock(&req->rq_lock);
1873
1874                 osc_dec_unstable_pages(req);
1875         } else {
1876                 req->rq_committed = 1;
1877                 spin_unlock(&req->rq_lock);
1878         }
1879 }
1880
1881 /**
1882  * Build an RPC by the list of extent @ext_list. The caller must ensure
1883  * that the total pages in this list are NOT over max pages per RPC.
1884  * Extents in the list must be in OES_RPC state.
1885  */
1886 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1887                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1888 {
1889         struct ptlrpc_request           *req = NULL;
1890         struct osc_extent               *ext;
1891         struct brw_page                 **pga = NULL;
1892         struct osc_brw_async_args       *aa = NULL;
1893         struct obdo                     *oa = NULL;
1894         struct osc_async_page           *oap;
1895         struct osc_async_page           *tmp;
1896         struct cl_req                   *clerq = NULL;
1897         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1898                                                                       CRT_READ;
1899         struct cl_req_attr              *crattr = NULL;
1900         obd_off                         starting_offset = OBD_OBJECT_EOF;
1901         obd_off                         ending_offset = 0;
1902         int                             mpflag = 0;
1903         int                             mem_tight = 0;
1904         int                             page_count = 0;
1905         bool                            soft_sync = false;
1906         int                             i;
1907         int                             rc;
1908         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1909
1910         ENTRY;
1911         LASSERT(!list_empty(ext_list));
1912
1913         /* add pages into rpc_list to build BRW rpc */
1914         list_for_each_entry(ext, ext_list, oe_link) {
1915                 LASSERT(ext->oe_state == OES_RPC);
1916                 mem_tight |= ext->oe_memalloc;
1917                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1918                         ++page_count;
1919                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1920                         if (starting_offset > oap->oap_obj_off)
1921                                 starting_offset = oap->oap_obj_off;
1922                         else
1923                                 LASSERT(oap->oap_page_off == 0);
1924                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1925                                 ending_offset = oap->oap_obj_off +
1926                                                 oap->oap_count;
1927                         else
1928                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1929                                         PAGE_CACHE_SIZE);
1930                 }
1931         }
1932
1933         soft_sync = osc_over_unstable_soft_limit(cli);
1934         if (mem_tight)
1935                 mpflag = cfs_memory_pressure_get_and_set();
1936
1937         OBD_ALLOC(crattr, sizeof(*crattr));
1938         if (crattr == NULL)
1939                 GOTO(out, rc = -ENOMEM);
1940
1941         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1942         if (pga == NULL)
1943                 GOTO(out, rc = -ENOMEM);
1944
1945         OBDO_ALLOC(oa);
1946         if (oa == NULL)
1947                 GOTO(out, rc = -ENOMEM);
1948
1949         i = 0;
1950         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1951                 struct cl_page *page = oap2cl_page(oap);
1952                 if (clerq == NULL) {
1953                         clerq = cl_req_alloc(env, page, crt,
1954                                              1 /* only 1-object rpcs for now */);
1955                         if (IS_ERR(clerq))
1956                                 GOTO(out, rc = PTR_ERR(clerq));
1957                 }
1958                 if (mem_tight)
1959                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1960                 if (soft_sync)
1961                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1962                 pga[i] = &oap->oap_brw_page;
1963                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1964                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1965                        pga[i]->pg, page_index(oap->oap_page), oap,
1966                        pga[i]->flag);
1967                 i++;
1968                 cl_req_page_add(env, clerq, page);
1969         }
1970
1971         /* always get the data for the obdo for the rpc */
1972         LASSERT(clerq != NULL);
1973         crattr->cra_oa = oa;
1974         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1975
1976         rc = cl_req_prep(env, clerq);
1977         if (rc != 0) {
1978                 CERROR("cl_req_prep failed: %d\n", rc);
1979                 GOTO(out, rc);
1980         }
1981
1982         sort_brw_pages(pga, page_count);
1983         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1984                         pga, &req, crattr->cra_capa, 1, 0);
1985         if (rc != 0) {
1986                 CERROR("prep_req failed: %d\n", rc);
1987                 GOTO(out, rc);
1988         }
1989
1990         req->rq_commit_cb = brw_commit;
1991         req->rq_interpret_reply = brw_interpret;
1992
1993         if (mem_tight != 0)
1994                 req->rq_memalloc = 1;
1995
1996         /* Need to update the timestamps after the request is built in case
1997          * we race with setattr (locally or in queue at OST).  If OST gets
1998          * later setattr before earlier BRW (as determined by the request xid),
1999          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2000          * way to do this in a single call.  bug 10150 */
2001         cl_req_attr_set(env, clerq, crattr,
2002                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2003
2004         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2005
2006         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2007         aa = ptlrpc_req_async_args(req);
2008         INIT_LIST_HEAD(&aa->aa_oaps);
2009         list_splice_init(&rpc_list, &aa->aa_oaps);
2010         INIT_LIST_HEAD(&aa->aa_exts);
2011         list_splice_init(ext_list, &aa->aa_exts);
2012         aa->aa_clerq = clerq;
2013
2014         /* queued sync pages can be torn down while the pages
2015          * were between the pending list and the rpc */
2016         tmp = NULL;
2017         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2018                 /* only one oap gets a request reference */
2019                 if (tmp == NULL)
2020                         tmp = oap;
2021                 if (oap->oap_interrupted && !req->rq_intr) {
2022                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2023                                         oap, req);
2024                         ptlrpc_mark_interrupted(req);
2025                 }
2026         }
2027         if (tmp != NULL)
2028                 tmp->oap_request = ptlrpc_request_addref(req);
2029
2030         client_obd_list_lock(&cli->cl_loi_list_lock);
2031         starting_offset >>= PAGE_CACHE_SHIFT;
2032         if (cmd == OBD_BRW_READ) {
2033                 cli->cl_r_in_flight++;
2034                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2035                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2036                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2037                                       starting_offset + 1);
2038         } else {
2039                 cli->cl_w_in_flight++;
2040                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2041                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2042                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2043                                       starting_offset + 1);
2044         }
2045         client_obd_list_unlock(&cli->cl_loi_list_lock);
2046
2047         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2048                   page_count, aa, cli->cl_r_in_flight,
2049                   cli->cl_w_in_flight);
2050
2051         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2052          * see which CPU/NUMA node the majority of pages were allocated
2053          * on, and try to assign the async RPC to the CPU core
2054          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2055          *
2056          * But on the other hand, we expect that multiple ptlrpcd
2057          * threads and the initial write sponsor can run in parallel,
2058          * especially when data checksum is enabled, which is CPU-bound
2059          * operation and single ptlrpcd thread cannot process in time.
2060          * So more ptlrpcd threads sharing BRW load
2061          * (with PDL_POLICY_ROUND) seems better.
2062          */
2063         ptlrpcd_add_req(req, pol, -1);
2064         rc = 0;
2065         EXIT;
2066
2067 out:
2068         if (mem_tight != 0)
2069                 cfs_memory_pressure_restore(mpflag);
2070
2071         if (crattr != NULL) {
2072                 capa_put(crattr->cra_capa);
2073                 OBD_FREE(crattr, sizeof(*crattr));
2074         }
2075
2076         if (rc != 0) {
2077                 LASSERT(req == NULL);
2078
2079                 if (oa)
2080                         OBDO_FREE(oa);
2081                 if (pga)
2082                         OBD_FREE(pga, sizeof(*pga) * page_count);
2083                 /* this should happen rarely and is pretty bad, it makes the
2084                  * pending list not follow the dirty order */
2085                 while (!list_empty(ext_list)) {
2086                         ext = list_entry(ext_list->next, struct osc_extent,
2087                                          oe_link);
2088                         list_del_init(&ext->oe_link);
2089                         osc_extent_finish(env, ext, 0, rc);
2090                 }
2091                 if (clerq && !IS_ERR(clerq))
2092                         cl_req_completion(env, clerq, rc);
2093         }
2094         RETURN(rc);
2095 }
2096
2097 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2098                                         struct ldlm_enqueue_info *einfo)
2099 {
2100         void *data = einfo->ei_cbdata;
2101         int set = 0;
2102
2103         LASSERT(lock != NULL);
2104         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2105         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2106         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2107         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2108
2109         lock_res_and_lock(lock);
2110         spin_lock(&osc_ast_guard);
2111
2112         if (lock->l_ast_data == NULL)
2113                 lock->l_ast_data = data;
2114         if (lock->l_ast_data == data)
2115                 set = 1;
2116
2117         spin_unlock(&osc_ast_guard);
2118         unlock_res_and_lock(lock);
2119
2120         return set;
2121 }
2122
2123 static int osc_set_data_with_check(struct lustre_handle *lockh,
2124                                    struct ldlm_enqueue_info *einfo)
2125 {
2126         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2127         int set = 0;
2128
2129         if (lock != NULL) {
2130                 set = osc_set_lock_data_with_check(lock, einfo);
2131                 LDLM_LOCK_PUT(lock);
2132         } else
2133                 CERROR("lockh %p, data %p - client evicted?\n",
2134                        lockh, einfo->ei_cbdata);
2135         return set;
2136 }
2137
2138 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2139                              ldlm_iterator_t replace, void *data)
2140 {
2141         struct ldlm_res_id res_id;
2142         struct obd_device *obd = class_exp2obd(exp);
2143
2144         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2145         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2146         return 0;
2147 }
2148
2149 /* find any ldlm lock of the inode in osc
2150  * return 0    not find
2151  *        1    find one
2152  *      < 0    error */
2153 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2154                            ldlm_iterator_t replace, void *data)
2155 {
2156         struct ldlm_res_id res_id;
2157         struct obd_device *obd = class_exp2obd(exp);
2158         int rc = 0;
2159
2160         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2161         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2162         if (rc == LDLM_ITER_STOP)
2163                 return(1);
2164         if (rc == LDLM_ITER_CONTINUE)
2165                 return(0);
2166         return(rc);
2167 }
2168
2169 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2170                             obd_enqueue_update_f upcall, void *cookie,
2171                             __u64 *flags, int agl, int rc)
2172 {
2173         int intent = *flags & LDLM_FL_HAS_INTENT;
2174         ENTRY;
2175
2176         if (intent) {
2177                 /* The request was created before ldlm_cli_enqueue call. */
2178                 if (rc == ELDLM_LOCK_ABORTED) {
2179                         struct ldlm_reply *rep;
2180                         rep = req_capsule_server_get(&req->rq_pill,
2181                                                      &RMF_DLM_REP);
2182
2183                         LASSERT(rep != NULL);
2184                         rep->lock_policy_res1 =
2185                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2186                         if (rep->lock_policy_res1)
2187                                 rc = rep->lock_policy_res1;
2188                 }
2189         }
2190
2191         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2192             (rc == 0)) {
2193                 *flags |= LDLM_FL_LVB_READY;
2194                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2195                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2196         }
2197
2198         /* Call the update callback. */
2199         rc = (*upcall)(cookie, rc);
2200         RETURN(rc);
2201 }
2202
2203 static int osc_enqueue_interpret(const struct lu_env *env,
2204                                  struct ptlrpc_request *req,
2205                                  struct osc_enqueue_args *aa, int rc)
2206 {
2207         struct ldlm_lock *lock;
2208         struct lustre_handle handle;
2209         __u32 mode;
2210         struct ost_lvb *lvb;
2211         __u32 lvb_len;
2212         __u64 *flags = aa->oa_flags;
2213
2214         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2215          * might be freed anytime after lock upcall has been called. */
2216         lustre_handle_copy(&handle, aa->oa_lockh);
2217         mode = aa->oa_ei->ei_mode;
2218
2219         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2220          * be valid. */
2221         lock = ldlm_handle2lock(&handle);
2222
2223         /* Take an additional reference so that a blocking AST that
2224          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2225          * to arrive after an upcall has been executed by
2226          * osc_enqueue_fini(). */
2227         ldlm_lock_addref(&handle, mode);
2228
2229         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2230         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2231
2232         /* Let CP AST to grant the lock first. */
2233         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2234
2235         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2236                 lvb = NULL;
2237                 lvb_len = 0;
2238         } else {
2239                 lvb = aa->oa_lvb;
2240                 lvb_len = sizeof(*aa->oa_lvb);
2241         }
2242
2243         /* Complete obtaining the lock procedure. */
2244         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2245                                    mode, flags, lvb, lvb_len, &handle, rc);
2246         /* Complete osc stuff. */
2247         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2248                               flags, aa->oa_agl, rc);
2249
2250         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2251
2252         /* Release the lock for async request. */
2253         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2254                 /*
2255                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2256                  * not already released by
2257                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2258                  */
2259                 ldlm_lock_decref(&handle, mode);
2260
2261         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2262                  aa->oa_lockh, req, aa);
2263         ldlm_lock_decref(&handle, mode);
2264         LDLM_LOCK_PUT(lock);
2265         return rc;
2266 }
2267
2268 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2269
2270 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2271  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2272  * other synchronous requests, however keeping some locks and trying to obtain
2273  * others may take a considerable amount of time in a case of ost failure; and
2274  * when other sync requests do not get released lock from a client, the client
2275  * is excluded from the cluster -- such scenarious make the life difficult, so
2276  * release locks just after they are obtained. */
2277 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2278                      __u64 *flags, ldlm_policy_data_t *policy,
2279                      struct ost_lvb *lvb, int kms_valid,
2280                      obd_enqueue_update_f upcall, void *cookie,
2281                      struct ldlm_enqueue_info *einfo,
2282                      struct lustre_handle *lockh,
2283                      struct ptlrpc_request_set *rqset, int async, int agl)
2284 {
2285         struct obd_device *obd = exp->exp_obd;
2286         struct ptlrpc_request *req = NULL;
2287         int intent = *flags & LDLM_FL_HAS_INTENT;
2288         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2289         ldlm_mode_t mode;
2290         int rc;
2291         ENTRY;
2292
2293         /* Filesystem lock extents are extended to page boundaries so that
2294          * dealing with the page cache is a little smoother.  */
2295         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2296         policy->l_extent.end |= ~CFS_PAGE_MASK;
2297
2298         /*
2299          * kms is not valid when either object is completely fresh (so that no
2300          * locks are cached), or object was evicted. In the latter case cached
2301          * lock cannot be used, because it would prime inode state with
2302          * potentially stale LVB.
2303          */
2304         if (!kms_valid)
2305                 goto no_match;
2306
2307         /* Next, search for already existing extent locks that will cover us */
2308         /* If we're trying to read, we also search for an existing PW lock.  The
2309          * VFS and page cache already protect us locally, so lots of readers/
2310          * writers can share a single PW lock.
2311          *
2312          * There are problems with conversion deadlocks, so instead of
2313          * converting a read lock to a write lock, we'll just enqueue a new
2314          * one.
2315          *
2316          * At some point we should cancel the read lock instead of making them
2317          * send us a blocking callback, but there are problems with canceling
2318          * locks out from other users right now, too. */
2319         mode = einfo->ei_mode;
2320         if (einfo->ei_mode == LCK_PR)
2321                 mode |= LCK_PW;
2322         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2323                                einfo->ei_type, policy, mode, lockh, 0);
2324         if (mode) {
2325                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2326
2327                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2328                         /* For AGL, if enqueue RPC is sent but the lock is not
2329                          * granted, then skip to process this strpe.
2330                          * Return -ECANCELED to tell the caller. */
2331                         ldlm_lock_decref(lockh, mode);
2332                         LDLM_LOCK_PUT(matched);
2333                         RETURN(-ECANCELED);
2334                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2335                         *flags |= LDLM_FL_LVB_READY;
2336                         /* addref the lock only if not async requests and PW
2337                          * lock is matched whereas we asked for PR. */
2338                         if (!rqset && einfo->ei_mode != mode)
2339                                 ldlm_lock_addref(lockh, LCK_PR);
2340                         if (intent) {
2341                                 /* I would like to be able to ASSERT here that
2342                                  * rss <= kms, but I can't, for reasons which
2343                                  * are explained in lov_enqueue() */
2344                         }
2345
2346                         /* We already have a lock, and it's referenced.
2347                          *
2348                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2349                          * AGL upcall may change it to CLS_HELD directly. */
2350                         (*upcall)(cookie, ELDLM_OK);
2351
2352                         if (einfo->ei_mode != mode)
2353                                 ldlm_lock_decref(lockh, LCK_PW);
2354                         else if (rqset)
2355                                 /* For async requests, decref the lock. */
2356                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2357                         LDLM_LOCK_PUT(matched);
2358                         RETURN(ELDLM_OK);
2359                 } else {
2360                         ldlm_lock_decref(lockh, mode);
2361                         LDLM_LOCK_PUT(matched);
2362                 }
2363         }
2364
2365  no_match:
2366         if (intent) {
2367                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2368                                            &RQF_LDLM_ENQUEUE_LVB);
2369                 if (req == NULL)
2370                         RETURN(-ENOMEM);
2371
2372                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2373                 if (rc < 0) {
2374                         ptlrpc_request_free(req);
2375                         RETURN(rc);
2376                 }
2377
2378                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2379                                      sizeof *lvb);
2380                 ptlrpc_request_set_replen(req);
2381         }
2382
2383         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2384         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2385
2386         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2387                               sizeof(*lvb), LVB_T_OST, lockh, async);
2388         if (rqset) {
2389                 if (!rc) {
2390                         struct osc_enqueue_args *aa;
2391                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2392                         aa = ptlrpc_req_async_args(req);
2393                         aa->oa_ei = einfo;
2394                         aa->oa_exp = exp;
2395                         aa->oa_flags  = flags;
2396                         aa->oa_upcall = upcall;
2397                         aa->oa_cookie = cookie;
2398                         aa->oa_lvb    = lvb;
2399                         aa->oa_lockh  = lockh;
2400                         aa->oa_agl    = !!agl;
2401
2402                         req->rq_interpret_reply =
2403                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2404                         if (rqset == PTLRPCD_SET)
2405                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2406                         else
2407                                 ptlrpc_set_add_req(rqset, req);
2408                 } else if (intent) {
2409                         ptlrpc_req_finished(req);
2410                 }
2411                 RETURN(rc);
2412         }
2413
2414         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2415         if (intent)
2416                 ptlrpc_req_finished(req);
2417
2418         RETURN(rc);
2419 }
2420
2421 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2422                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2423                    __u64 *flags, void *data, struct lustre_handle *lockh,
2424                    int unref)
2425 {
2426         struct obd_device *obd = exp->exp_obd;
2427         __u64 lflags = *flags;
2428         ldlm_mode_t rc;
2429         ENTRY;
2430
2431         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2432                 RETURN(-EIO);
2433
2434         /* Filesystem lock extents are extended to page boundaries so that
2435          * dealing with the page cache is a little smoother */
2436         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2437         policy->l_extent.end |= ~CFS_PAGE_MASK;
2438
2439         /* Next, search for already existing extent locks that will cover us */
2440         /* If we're trying to read, we also search for an existing PW lock.  The
2441          * VFS and page cache already protect us locally, so lots of readers/
2442          * writers can share a single PW lock. */
2443         rc = mode;
2444         if (mode == LCK_PR)
2445                 rc |= LCK_PW;
2446         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2447                              res_id, type, policy, rc, lockh, unref);
2448         if (rc) {
2449                 if (data != NULL) {
2450                         if (!osc_set_data_with_check(lockh, data)) {
2451                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2452                                         ldlm_lock_decref(lockh, rc);
2453                                 RETURN(0);
2454                         }
2455                 }
2456                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2457                         ldlm_lock_addref(lockh, LCK_PR);
2458                         ldlm_lock_decref(lockh, LCK_PW);
2459                 }
2460                 RETURN(rc);
2461         }
2462         RETURN(rc);
2463 }
2464
2465 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2466 {
2467         ENTRY;
2468
2469         if (unlikely(mode == LCK_GROUP))
2470                 ldlm_lock_decref_and_cancel(lockh, mode);
2471         else
2472                 ldlm_lock_decref(lockh, mode);
2473
2474         RETURN(0);
2475 }
2476
2477 static int osc_statfs_interpret(const struct lu_env *env,
2478                                 struct ptlrpc_request *req,
2479                                 struct osc_async_args *aa, int rc)
2480 {
2481         struct obd_statfs *msfs;
2482         ENTRY;
2483
2484         if (rc == -EBADR)
2485                 /* The request has in fact never been sent
2486                  * due to issues at a higher level (LOV).
2487                  * Exit immediately since the caller is
2488                  * aware of the problem and takes care
2489                  * of the clean up */
2490                  RETURN(rc);
2491
2492         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2493             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2494                 GOTO(out, rc = 0);
2495
2496         if (rc != 0)
2497                 GOTO(out, rc);
2498
2499         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2500         if (msfs == NULL) {
2501                 GOTO(out, rc = -EPROTO);
2502         }
2503
2504         *aa->aa_oi->oi_osfs = *msfs;
2505 out:
2506         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2507         RETURN(rc);
2508 }
2509
2510 static int osc_statfs_async(struct obd_export *exp,
2511                             struct obd_info *oinfo, __u64 max_age,
2512                             struct ptlrpc_request_set *rqset)
2513 {
2514         struct obd_device     *obd = class_exp2obd(exp);
2515         struct ptlrpc_request *req;
2516         struct osc_async_args *aa;
2517         int                    rc;
2518         ENTRY;
2519
2520         /* We could possibly pass max_age in the request (as an absolute
2521          * timestamp or a "seconds.usec ago") so the target can avoid doing
2522          * extra calls into the filesystem if that isn't necessary (e.g.
2523          * during mount that would help a bit).  Having relative timestamps
2524          * is not so great if request processing is slow, while absolute
2525          * timestamps are not ideal because they need time synchronization. */
2526         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2527         if (req == NULL)
2528                 RETURN(-ENOMEM);
2529
2530         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2531         if (rc) {
2532                 ptlrpc_request_free(req);
2533                 RETURN(rc);
2534         }
2535         ptlrpc_request_set_replen(req);
2536         req->rq_request_portal = OST_CREATE_PORTAL;
2537         ptlrpc_at_set_req_timeout(req);
2538
2539         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2540                 /* procfs requests not want stat in wait for avoid deadlock */
2541                 req->rq_no_resend = 1;
2542                 req->rq_no_delay = 1;
2543         }
2544
2545         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2546         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2547         aa = ptlrpc_req_async_args(req);
2548         aa->aa_oi = oinfo;
2549
2550         ptlrpc_set_add_req(rqset, req);
2551         RETURN(0);
2552 }
2553
2554 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2555                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2556 {
2557         struct obd_device     *obd = class_exp2obd(exp);
2558         struct obd_statfs     *msfs;
2559         struct ptlrpc_request *req;
2560         struct obd_import     *imp = NULL;
2561         int rc;
2562         ENTRY;
2563
2564         /*Since the request might also come from lprocfs, so we need
2565          *sync this with client_disconnect_export Bug15684*/
2566         down_read(&obd->u.cli.cl_sem);
2567         if (obd->u.cli.cl_import)
2568                 imp = class_import_get(obd->u.cli.cl_import);
2569         up_read(&obd->u.cli.cl_sem);
2570         if (!imp)
2571                 RETURN(-ENODEV);
2572
2573         /* We could possibly pass max_age in the request (as an absolute
2574          * timestamp or a "seconds.usec ago") so the target can avoid doing
2575          * extra calls into the filesystem if that isn't necessary (e.g.
2576          * during mount that would help a bit).  Having relative timestamps
2577          * is not so great if request processing is slow, while absolute
2578          * timestamps are not ideal because they need time synchronization. */
2579         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2580
2581         class_import_put(imp);
2582
2583         if (req == NULL)
2584                 RETURN(-ENOMEM);
2585
2586         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2587         if (rc) {
2588                 ptlrpc_request_free(req);
2589                 RETURN(rc);
2590         }
2591         ptlrpc_request_set_replen(req);
2592         req->rq_request_portal = OST_CREATE_PORTAL;
2593         ptlrpc_at_set_req_timeout(req);
2594
2595         if (flags & OBD_STATFS_NODELAY) {
2596                 /* procfs requests not want stat in wait for avoid deadlock */
2597                 req->rq_no_resend = 1;
2598                 req->rq_no_delay = 1;
2599         }
2600
2601         rc = ptlrpc_queue_wait(req);
2602         if (rc)
2603                 GOTO(out, rc);
2604
2605         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2606         if (msfs == NULL) {
2607                 GOTO(out, rc = -EPROTO);
2608         }
2609
2610         *osfs = *msfs;
2611
2612         EXIT;
2613  out:
2614         ptlrpc_req_finished(req);
2615         return rc;
2616 }
2617
2618 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2619                          void *karg, void *uarg)
2620 {
2621         struct obd_device *obd = exp->exp_obd;
2622         struct obd_ioctl_data *data = karg;
2623         int err = 0;
2624         ENTRY;
2625
2626         if (!try_module_get(THIS_MODULE)) {
2627                 CERROR("Can't get module. Is it alive?");
2628                 return -EINVAL;
2629         }
2630         switch (cmd) {
2631         case OBD_IOC_CLIENT_RECOVER:
2632                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2633                                             data->ioc_inlbuf1, 0);
2634                 if (err > 0)
2635                         err = 0;
2636                 GOTO(out, err);
2637         case IOC_OSC_SET_ACTIVE:
2638                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2639                                                data->ioc_offset);
2640                 GOTO(out, err);
2641         case OBD_IOC_POLL_QUOTACHECK:
2642                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2643                 GOTO(out, err);
2644         case OBD_IOC_PING_TARGET:
2645                 err = ptlrpc_obd_ping(obd);
2646                 GOTO(out, err);
2647         default:
2648                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2649                        cmd, current_comm());
2650                 GOTO(out, err = -ENOTTY);
2651         }
2652 out:
2653         module_put(THIS_MODULE);
2654         return err;
2655 }
2656
2657 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2658                         obd_count keylen, void *key, __u32 *vallen, void *val,
2659                         struct lov_stripe_md *lsm)
2660 {
2661         ENTRY;
2662         if (!vallen || !val)
2663                 RETURN(-EFAULT);
2664
2665         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2666                 __u32 *stripe = val;
2667                 *vallen = sizeof(*stripe);
2668                 *stripe = 0;
2669                 RETURN(0);
2670         } else if (KEY_IS(KEY_LAST_ID)) {
2671                 struct ptlrpc_request *req;
2672                 obd_id                *reply;
2673                 char                  *tmp;
2674                 int                    rc;
2675
2676                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2677                                            &RQF_OST_GET_INFO_LAST_ID);
2678                 if (req == NULL)
2679                         RETURN(-ENOMEM);
2680
2681                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2682                                      RCL_CLIENT, keylen);
2683                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2684                 if (rc) {
2685                         ptlrpc_request_free(req);
2686                         RETURN(rc);
2687                 }
2688
2689                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2690                 memcpy(tmp, key, keylen);
2691
2692                 req->rq_no_delay = req->rq_no_resend = 1;
2693                 ptlrpc_request_set_replen(req);
2694                 rc = ptlrpc_queue_wait(req);
2695                 if (rc)
2696                         GOTO(out, rc);
2697
2698                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2699                 if (reply == NULL)
2700                         GOTO(out, rc = -EPROTO);
2701
2702                 *((obd_id *)val) = *reply;
2703         out:
2704                 ptlrpc_req_finished(req);
2705                 RETURN(rc);
2706         } else if (KEY_IS(KEY_FIEMAP)) {
2707                 struct ll_fiemap_info_key *fm_key =
2708                                 (struct ll_fiemap_info_key *)key;
2709                 struct ldlm_res_id       res_id;
2710                 ldlm_policy_data_t       policy;
2711                 struct lustre_handle     lockh;
2712                 ldlm_mode_t              mode = 0;
2713                 struct ptlrpc_request   *req;
2714                 struct ll_user_fiemap   *reply;
2715                 char                    *tmp;
2716                 int                      rc;
2717
2718                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2719                         goto skip_locking;
2720
2721                 policy.l_extent.start = fm_key->fiemap.fm_start &
2722                                                 CFS_PAGE_MASK;
2723
2724                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2725                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2726                         policy.l_extent.end = OBD_OBJECT_EOF;
2727                 else
2728                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2729                                 fm_key->fiemap.fm_length +
2730                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2731
2732                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2733                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2734                                        LDLM_FL_BLOCK_GRANTED |
2735                                        LDLM_FL_LVB_READY,
2736                                        &res_id, LDLM_EXTENT, &policy,
2737                                        LCK_PR | LCK_PW, &lockh, 0);
2738                 if (mode) { /* lock is cached on client */
2739                         if (mode != LCK_PR) {
2740                                 ldlm_lock_addref(&lockh, LCK_PR);
2741                                 ldlm_lock_decref(&lockh, LCK_PW);
2742                         }
2743                 } else { /* no cached lock, needs acquire lock on server side */
2744                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2745                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2746                 }
2747
2748 skip_locking:
2749                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2750                                            &RQF_OST_GET_INFO_FIEMAP);
2751                 if (req == NULL)
2752                         GOTO(drop_lock, rc = -ENOMEM);
2753
2754                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2755                                      RCL_CLIENT, keylen);
2756                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2757                                      RCL_CLIENT, *vallen);
2758                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2759                                      RCL_SERVER, *vallen);
2760
2761                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2762                 if (rc) {
2763                         ptlrpc_request_free(req);
2764                         GOTO(drop_lock, rc);
2765                 }
2766
2767                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2768                 memcpy(tmp, key, keylen);
2769                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2770                 memcpy(tmp, val, *vallen);
2771
2772                 ptlrpc_request_set_replen(req);
2773                 rc = ptlrpc_queue_wait(req);
2774                 if (rc)
2775                         GOTO(fini_req, rc);
2776
2777                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2778                 if (reply == NULL)
2779                         GOTO(fini_req, rc = -EPROTO);
2780
2781                 memcpy(val, reply, *vallen);
2782 fini_req:
2783                 ptlrpc_req_finished(req);
2784 drop_lock:
2785                 if (mode)
2786                         ldlm_lock_decref(&lockh, LCK_PR);
2787                 RETURN(rc);
2788         }
2789
2790         RETURN(-EINVAL);
2791 }
2792
2793 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2794                               obd_count keylen, void *key, obd_count vallen,
2795                               void *val, struct ptlrpc_request_set *set)
2796 {
2797         struct ptlrpc_request *req;
2798         struct obd_device     *obd = exp->exp_obd;
2799         struct obd_import     *imp = class_exp2cliimp(exp);
2800         char                  *tmp;
2801         int                    rc;
2802         ENTRY;
2803
2804         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2805
2806         if (KEY_IS(KEY_CHECKSUM)) {
2807                 if (vallen != sizeof(int))
2808                         RETURN(-EINVAL);
2809                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2810                 RETURN(0);
2811         }
2812
2813         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2814                 sptlrpc_conf_client_adapt(obd);
2815                 RETURN(0);
2816         }
2817
2818         if (KEY_IS(KEY_FLUSH_CTX)) {
2819                 sptlrpc_import_flush_my_ctx(imp);
2820                 RETURN(0);
2821         }
2822
2823         if (KEY_IS(KEY_CACHE_SET)) {
2824                 struct client_obd *cli = &obd->u.cli;
2825
2826                 LASSERT(cli->cl_cache == NULL); /* only once */
2827                 cli->cl_cache = (struct cl_client_cache *)val;
2828                 atomic_inc(&cli->cl_cache->ccc_users);
2829                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2830
2831                 /* add this osc into entity list */
2832                 LASSERT(list_empty(&cli->cl_lru_osc));
2833                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2834                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2835                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2836
2837                 RETURN(0);
2838         }
2839
2840         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2841                 struct client_obd *cli = &obd->u.cli;
2842                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2843                 int target = *(int *)val;
2844
2845                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2846                 *(int *)val -= nr;
2847                 RETURN(0);
2848         }
2849
2850         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2851                 RETURN(-EINVAL);
2852
2853         /* We pass all other commands directly to OST. Since nobody calls osc
2854            methods directly and everybody is supposed to go through LOV, we
2855            assume lov checked invalid values for us.
2856            The only recognised values so far are evict_by_nid and mds_conn.
2857            Even if something bad goes through, we'd get a -EINVAL from OST
2858            anyway. */
2859
2860         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2861                                                 &RQF_OST_SET_GRANT_INFO :
2862                                                 &RQF_OBD_SET_INFO);
2863         if (req == NULL)
2864                 RETURN(-ENOMEM);
2865
2866         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2867                              RCL_CLIENT, keylen);
2868         if (!KEY_IS(KEY_GRANT_SHRINK))
2869                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2870                                      RCL_CLIENT, vallen);
2871         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2872         if (rc) {
2873                 ptlrpc_request_free(req);
2874                 RETURN(rc);
2875         }
2876
2877         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2878         memcpy(tmp, key, keylen);
2879         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2880                                                         &RMF_OST_BODY :
2881                                                         &RMF_SETINFO_VAL);
2882         memcpy(tmp, val, vallen);
2883
2884         if (KEY_IS(KEY_GRANT_SHRINK)) {
2885                 struct osc_grant_args *aa;
2886                 struct obdo *oa;
2887
2888                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2889                 aa = ptlrpc_req_async_args(req);
2890                 OBDO_ALLOC(oa);
2891                 if (!oa) {
2892                         ptlrpc_req_finished(req);
2893                         RETURN(-ENOMEM);
2894                 }
2895                 *oa = ((struct ost_body *)val)->oa;
2896                 aa->aa_oa = oa;
2897                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2898         }
2899
2900         ptlrpc_request_set_replen(req);
2901         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2902                 LASSERT(set != NULL);
2903                 ptlrpc_set_add_req(set, req);
2904                 ptlrpc_check_set(NULL, set);
2905         } else
2906                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2907
2908         RETURN(0);
2909 }
2910
2911 static int osc_reconnect(const struct lu_env *env,
2912                          struct obd_export *exp, struct obd_device *obd,
2913                          struct obd_uuid *cluuid,
2914                          struct obd_connect_data *data,
2915                          void *localdata)
2916 {
2917         struct client_obd *cli = &obd->u.cli;
2918
2919         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2920                 long lost_grant;
2921
2922                 client_obd_list_lock(&cli->cl_loi_list_lock);
2923                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2924                                 2 * cli_brw_size(obd);
2925                 lost_grant = cli->cl_lost_grant;
2926                 cli->cl_lost_grant = 0;
2927                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2928
2929                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2930                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2931                        data->ocd_version, data->ocd_grant, lost_grant);
2932         }
2933
2934         RETURN(0);
2935 }
2936
2937 static int osc_disconnect(struct obd_export *exp)
2938 {
2939         struct obd_device *obd = class_exp2obd(exp);
2940         struct llog_ctxt  *ctxt;
2941         int rc;
2942
2943         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2944         if (ctxt) {
2945                 if (obd->u.cli.cl_conn_count == 1) {
2946                         /* Flush any remaining cancel messages out to the
2947                          * target */
2948                         llog_sync(ctxt, exp, 0);
2949                 }
2950                 llog_ctxt_put(ctxt);
2951         } else {
2952                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
2953                        obd);
2954         }
2955
2956         rc = client_disconnect_export(exp);
2957         /**
2958          * Initially we put del_shrink_grant before disconnect_export, but it
2959          * causes the following problem if setup (connect) and cleanup
2960          * (disconnect) are tangled together.
2961          *      connect p1                     disconnect p2
2962          *   ptlrpc_connect_import
2963          *     ...............               class_manual_cleanup
2964          *                                     osc_disconnect
2965          *                                     del_shrink_grant
2966          *   ptlrpc_connect_interrupt
2967          *     init_grant_shrink
2968          *   add this client to shrink list
2969          *                                      cleanup_osc
2970          * Bang! pinger trigger the shrink.
2971          * So the osc should be disconnected from the shrink list, after we
2972          * are sure the import has been destroyed. BUG18662
2973          */
2974         if (obd->u.cli.cl_import == NULL)
2975                 osc_del_shrink_grant(&obd->u.cli);
2976         return rc;
2977 }
2978
2979 static int osc_import_event(struct obd_device *obd,
2980                             struct obd_import *imp,
2981                             enum obd_import_event event)
2982 {
2983         struct client_obd *cli;
2984         int rc = 0;
2985
2986         ENTRY;
2987         LASSERT(imp->imp_obd == obd);
2988
2989         switch (event) {
2990         case IMP_EVENT_DISCON: {
2991                 cli = &obd->u.cli;
2992                 client_obd_list_lock(&cli->cl_loi_list_lock);
2993                 cli->cl_avail_grant = 0;
2994                 cli->cl_lost_grant = 0;
2995                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2996                 break;
2997         }
2998         case IMP_EVENT_INACTIVE: {
2999                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3000                 break;
3001         }
3002         case IMP_EVENT_INVALIDATE: {
3003                 struct ldlm_namespace *ns = obd->obd_namespace;
3004                 struct lu_env         *env;
3005                 int                    refcheck;
3006
3007                 env = cl_env_get(&refcheck);
3008                 if (!IS_ERR(env)) {
3009                         /* Reset grants */
3010                         cli = &obd->u.cli;
3011                         /* all pages go to failing rpcs due to the invalid
3012                          * import */
3013                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3014
3015                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3016                         cl_env_put(env, &refcheck);
3017                 } else
3018                         rc = PTR_ERR(env);
3019                 break;
3020         }
3021         case IMP_EVENT_ACTIVE: {
3022                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3023                 break;
3024         }
3025         case IMP_EVENT_OCD: {
3026                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3027
3028                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3029                         osc_init_grant(&obd->u.cli, ocd);
3030
3031                 /* See bug 7198 */
3032                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3033                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3034
3035                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3036                 break;
3037         }
3038         case IMP_EVENT_DEACTIVATE: {
3039                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3040                 break;
3041         }
3042         case IMP_EVENT_ACTIVATE: {
3043                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3044                 break;
3045         }
3046         default:
3047                 CERROR("Unknown import event %d\n", event);
3048                 LBUG();
3049         }
3050         RETURN(rc);
3051 }
3052
3053 /**
3054  * Determine whether the lock can be canceled before replaying the lock
3055  * during recovery, see bug16774 for detailed information.
3056  *
3057  * \retval zero the lock can't be canceled
3058  * \retval other ok to cancel
3059  */
3060 static int osc_cancel_weight(struct ldlm_lock *lock)
3061 {
3062         /*
3063          * Cancel all unused and granted extent lock.
3064          */
3065         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3066             lock->l_granted_mode == lock->l_req_mode &&
3067             osc_ldlm_weigh_ast(lock) == 0)
3068                 RETURN(1);
3069
3070         RETURN(0);
3071 }
3072
3073 static int brw_queue_work(const struct lu_env *env, void *data)
3074 {
3075         struct client_obd *cli = data;
3076
3077         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3078
3079         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3080         RETURN(0);
3081 }
3082
3083 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3084 {
3085         struct client_obd *cli = &obd->u.cli;
3086         struct obd_type   *type;
3087         void              *handler;
3088         int                rc;
3089         ENTRY;
3090
3091         rc = ptlrpcd_addref();
3092         if (rc)
3093                 RETURN(rc);
3094
3095         rc = client_obd_setup(obd, lcfg);
3096         if (rc)
3097                 GOTO(out_ptlrpcd, rc);
3098
3099         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3100         if (IS_ERR(handler))
3101                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3102         cli->cl_writeback_work = handler;
3103
3104         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3105         if (IS_ERR(handler))
3106                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3107         cli->cl_lru_work = handler;
3108
3109         rc = osc_quota_setup(obd);
3110         if (rc)
3111                 GOTO(out_ptlrpcd_work, rc);
3112
3113         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3114
3115 #ifdef LPROCFS
3116         obd->obd_vars = lprocfs_osc_obd_vars;
3117 #endif
3118         /* If this is true then both client (osc) and server (osp) are on the
3119          * same node. The osp layer if loaded first will register the osc proc
3120          * directory. In that case this obd_device will be attached its proc
3121          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3122         type = class_search_type(LUSTRE_OSP_NAME);
3123         if (type && type->typ_procsym) {
3124                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3125                                                            type->typ_procsym,
3126                                                            obd->obd_vars, obd);
3127                 if (IS_ERR(obd->obd_proc_entry)) {
3128                         rc = PTR_ERR(obd->obd_proc_entry);
3129                         CERROR("error %d setting up lprocfs for %s\n", rc,
3130                                obd->obd_name);
3131                         obd->obd_proc_entry = NULL;
3132                 }
3133         } else {
3134                 rc = lprocfs_seq_obd_setup(obd);
3135         }
3136
3137         /* If the basic OSC proc tree construction succeeded then
3138          * lets do the rest. */
3139         if (rc == 0) {
3140                 lproc_osc_attach_seqstat(obd);
3141                 sptlrpc_lprocfs_cliobd_attach(obd);
3142                 ptlrpc_lprocfs_register_obd(obd);
3143         }
3144
3145         /* We need to allocate a few requests more, because
3146          * brw_interpret tries to create new requests before freeing
3147          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3148          * reserved, but I'm afraid that might be too much wasted RAM
3149          * in fact, so 2 is just my guess and still should work. */
3150         cli->cl_import->imp_rq_pool =
3151                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3152                                     OST_MAXREQSIZE,
3153                                     ptlrpc_add_rqs_to_pool);
3154
3155         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3156         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3157         RETURN(0);
3158
3159 out_ptlrpcd_work:
3160         if (cli->cl_writeback_work != NULL) {
3161                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3162                 cli->cl_writeback_work = NULL;
3163         }
3164         if (cli->cl_lru_work != NULL) {
3165                 ptlrpcd_destroy_work(cli->cl_lru_work);
3166                 cli->cl_lru_work = NULL;
3167         }
3168 out_client_setup:
3169         client_obd_cleanup(obd);
3170 out_ptlrpcd:
3171         ptlrpcd_decref();
3172         RETURN(rc);
3173 }
3174
3175 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3176 {
3177         int rc = 0;
3178         ENTRY;
3179
3180         switch (stage) {
3181         case OBD_CLEANUP_EARLY: {
3182                 struct obd_import *imp;
3183                 imp = obd->u.cli.cl_import;
3184                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3185                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3186                 ptlrpc_deactivate_import(imp);
3187                 spin_lock(&imp->imp_lock);
3188                 imp->imp_pingable = 0;
3189                 spin_unlock(&imp->imp_lock);
3190                 break;
3191         }
3192         case OBD_CLEANUP_EXPORTS: {
3193                 struct client_obd *cli = &obd->u.cli;
3194                 /* LU-464
3195                  * for echo client, export may be on zombie list, wait for
3196                  * zombie thread to cull it, because cli.cl_import will be
3197                  * cleared in client_disconnect_export():
3198                  *   class_export_destroy() -> obd_cleanup() ->
3199                  *   echo_device_free() -> echo_client_cleanup() ->
3200                  *   obd_disconnect() -> osc_disconnect() ->
3201                  *   client_disconnect_export()
3202                  */
3203                 obd_zombie_barrier();
3204                 if (cli->cl_writeback_work) {
3205                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3206                         cli->cl_writeback_work = NULL;
3207                 }
3208                 if (cli->cl_lru_work) {
3209                         ptlrpcd_destroy_work(cli->cl_lru_work);
3210                         cli->cl_lru_work = NULL;
3211                 }
3212                 obd_cleanup_client_import(obd);
3213                 ptlrpc_lprocfs_unregister_obd(obd);
3214                 lprocfs_obd_cleanup(obd);
3215                 rc = obd_llog_finish(obd, 0);
3216                 if (rc != 0)
3217                         CERROR("failed to cleanup llogging subsystems\n");
3218                 break;
3219                 }
3220         }
3221         RETURN(rc);
3222 }
3223
3224 int osc_cleanup(struct obd_device *obd)
3225 {
3226         struct client_obd *cli = &obd->u.cli;
3227         int rc;
3228
3229         ENTRY;
3230
3231         /* lru cleanup */
3232         if (cli->cl_cache != NULL) {
3233                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3234                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3235                 list_del_init(&cli->cl_lru_osc);
3236                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3237                 cli->cl_lru_left = NULL;
3238                 atomic_dec(&cli->cl_cache->ccc_users);
3239                 cli->cl_cache = NULL;
3240         }
3241
3242         /* free memory of osc quota cache */
3243         osc_quota_cleanup(obd);
3244
3245         rc = client_obd_cleanup(obd);
3246
3247         ptlrpcd_decref();
3248         RETURN(rc);
3249 }
3250
3251 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3252 {
3253         int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars,
3254                                               lcfg, obd);
3255         return rc > 0 ? 0: rc;
3256 }
3257
3258 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3259 {
3260         return osc_process_config_base(obd, buf);
3261 }
3262
3263 struct obd_ops osc_obd_ops = {
3264         .o_owner                = THIS_MODULE,
3265         .o_setup                = osc_setup,
3266         .o_precleanup           = osc_precleanup,
3267         .o_cleanup              = osc_cleanup,
3268         .o_add_conn             = client_import_add_conn,
3269         .o_del_conn             = client_import_del_conn,
3270         .o_connect              = client_connect_import,
3271         .o_reconnect            = osc_reconnect,
3272         .o_disconnect           = osc_disconnect,
3273         .o_statfs               = osc_statfs,
3274         .o_statfs_async         = osc_statfs_async,
3275         .o_unpackmd             = osc_unpackmd,
3276         .o_create               = osc_create,
3277         .o_destroy              = osc_destroy,
3278         .o_getattr              = osc_getattr,
3279         .o_getattr_async        = osc_getattr_async,
3280         .o_setattr              = osc_setattr,
3281         .o_setattr_async        = osc_setattr_async,
3282         .o_change_cbdata        = osc_change_cbdata,
3283         .o_find_cbdata          = osc_find_cbdata,
3284         .o_iocontrol            = osc_iocontrol,
3285         .o_get_info             = osc_get_info,
3286         .o_set_info_async       = osc_set_info_async,
3287         .o_import_event         = osc_import_event,
3288         .o_process_config       = osc_process_config,
3289         .o_quotactl             = osc_quotactl,
3290         .o_quotacheck           = osc_quotacheck,
3291 };
3292
3293 extern struct lu_kmem_descr osc_caches[];
3294 extern spinlock_t osc_ast_guard;
3295 extern struct lock_class_key osc_ast_guard_class;
3296
3297 int __init osc_init(void)
3298 {
3299         bool enable_proc = true;
3300         struct obd_type *type;
3301         int rc;
3302         ENTRY;
3303
3304         /* print an address of _any_ initialized kernel symbol from this
3305          * module, to allow debugging with gdb that doesn't support data
3306          * symbols from modules.*/
3307         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3308
3309         rc = lu_kmem_init(osc_caches);
3310         if (rc)
3311                 RETURN(rc);
3312
3313         type = class_search_type(LUSTRE_OSP_NAME);
3314         if (type != NULL && type->typ_procsym != NULL)
3315                 enable_proc = false;
3316
3317         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3318 #ifndef HAVE_ONLY_PROCFS_SEQ
3319                                  NULL,
3320 #endif
3321                                  LUSTRE_OSC_NAME, &osc_device_type);
3322         if (rc) {
3323                 lu_kmem_fini(osc_caches);
3324                 RETURN(rc);
3325         }
3326
3327         spin_lock_init(&osc_ast_guard);
3328         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3329
3330         RETURN(rc);
3331 }
3332
3333 #ifdef __KERNEL__
3334 static void /*__exit*/ osc_exit(void)
3335 {
3336         class_unregister_type(LUSTRE_OSC_NAME);
3337         lu_kmem_fini(osc_caches);
3338 }
3339
3340 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3341 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3342 MODULE_LICENSE("GPL");
3343
3344 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3345 #endif