Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / ptlrpc / llog_client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2004 Cluster File Systems, Inc.
5  *   Author: Andreas Dilger <adilger@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  remote api for llog - client side
26  *
27  */
28
29 #define DEBUG_SUBSYSTEM S_LOG
30
31 #ifndef EXPORT_SYMTAB
32 #define EXPORT_SYMTAB
33 #endif
34
35 #ifdef __KERNEL__
36 #include <libcfs/libcfs.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <obd_class.h>
42 #include <lustre_log.h>
43 #include <lustre_net.h>
44 #include <libcfs/list.h>
45
46 #define  LLOG_CLIENT_ENTRY(ctxt, imp) do {                            \
47         mutex_down(&ctxt->loc_sem);                                   \
48         if (ctxt->loc_imp) {                                          \
49                 imp = class_import_get(ctxt->loc_imp);                \
50         } else {                                                      \
51                 CERROR("ctxt->loc_imp == NULL for context idx %d."    \
52                        "Unable to complete MDS/OSS recovery,"         \
53                        "but I'll try again next time.  Not fatal.\n", \
54                        ctxt->loc_idx);                                \
55                 imp = NULL;                                           \
56                 mutex_up(&ctxt->loc_sem);                             \
57                 return (-EINVAL);                                     \
58         }                                                             \
59         mutex_up(&ctxt->loc_sem);                                     \
60 } while(0)
61
62 #define  LLOG_CLIENT_EXIT(ctxt, imp) do {                  \
63         mutex_down(&ctxt->loc_sem);                        \
64         if (ctxt->loc_imp != imp)                          \
65                 CWARN("loc_imp has changed from %p to %p", \
66                        ctxt->loc_imp, imp);                \
67         class_import_put(imp);                             \
68         mutex_up(&ctxt->loc_sem);                          \
69 } while(0)
70
71 /* This is a callback from the llog_* functions.
72  * Assumes caller has already pushed us into the kernel context. */
73 static int llog_client_create(struct llog_ctxt *ctxt, struct llog_handle **res,
74                               struct llog_logid *logid, char *name)
75 {
76         struct obd_import     *imp;
77         struct llogd_body     *body;
78         struct llog_handle    *handle;
79         struct ptlrpc_request *req = NULL;
80         int                    rc;
81         ENTRY;
82
83         LLOG_CLIENT_ENTRY(ctxt, imp);
84
85         handle = llog_alloc_handle();
86         if (handle == NULL)
87                 RETURN(-ENOMEM);
88         *res = handle;
89
90         req = ptlrpc_request_alloc(imp, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
91         if (req == NULL)
92                 GOTO(err_free, rc = -ENOMEM);
93
94         if (name)
95                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
96                                      strlen(name) + 1);
97
98         rc = ptlrpc_request_pack(req, LUSTRE_LOG_VERSION,
99                                  LLOG_ORIGIN_HANDLE_CREATE);
100         if (rc) {
101                 ptlrpc_request_free(req);
102                 GOTO(err_free, rc);
103         }
104         ptlrpc_request_set_replen(req);
105
106         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
107         if (logid)
108                 body->lgd_logid = *logid;
109         body->lgd_ctxt_idx = ctxt->loc_idx - 1;
110
111         if (name) {
112                 char *tmp;
113                 tmp = req_capsule_client_sized_get(&req->rq_pill, &RMF_NAME,
114                                                    strlen(name) + 1);
115                 LASSERT(tmp);
116                 strcpy(tmp, name);
117         }
118
119         rc = ptlrpc_queue_wait(req);
120         if (rc)
121                 GOTO(err_free, rc);
122
123         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
124         if (body == NULL)
125                 GOTO(err_free, rc =-EFAULT);
126
127         handle->lgh_id = body->lgd_logid;
128         handle->lgh_ctxt = ctxt;
129         EXIT;
130 out:
131         LLOG_CLIENT_EXIT(ctxt, imp);
132         ptlrpc_req_finished(req);
133         return rc;
134 err_free:
135         llog_free_handle(handle);
136         goto out;
137 }
138
139 static int llog_client_destroy(struct llog_handle *loghandle)
140 {
141         struct obd_import     *imp;
142         struct ptlrpc_request *req = NULL;
143         struct llogd_body     *body;
144         int                    rc;
145         ENTRY;
146
147         LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp);
148         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_DESTROY,
149                                         LUSTRE_LOG_VERSION,
150                                         LLOG_ORIGIN_HANDLE_DESTROY);
151         if (req == NULL)
152                 GOTO(err_exit, rc =-ENOMEM);
153
154         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
155         body->lgd_logid = loghandle->lgh_id;
156         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
157
158         ptlrpc_request_set_replen(req);
159         rc = ptlrpc_queue_wait(req);
160         
161         ptlrpc_req_finished(req);
162 err_exit:
163         LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp);
164         RETURN(rc);
165 }
166
167
168 static int llog_client_next_block(struct llog_handle *loghandle,
169                                   int *cur_idx, int next_idx,
170                                   __u64 *cur_offset, void *buf, int len)
171 {
172         struct obd_import     *imp;
173         struct ptlrpc_request *req = NULL;
174         struct llogd_body     *body;
175         void                  *ptr;
176         int                    rc;
177         ENTRY;
178
179         LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp);
180         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK,
181                                         LUSTRE_LOG_VERSION,
182                                         LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
183         if (req == NULL)
184                 GOTO(err_exit, rc =-ENOMEM);
185                 
186         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
187         body->lgd_logid = loghandle->lgh_id;
188         body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
189         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
190         body->lgd_index = next_idx;
191         body->lgd_saved_index = *cur_idx;
192         body->lgd_len = len;
193         body->lgd_cur_offset = *cur_offset;
194
195         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, len);
196         ptlrpc_request_set_replen(req);
197         rc = ptlrpc_queue_wait(req);
198         if (rc)
199                 GOTO(out, rc);
200
201         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
202         if (body == NULL)
203                 GOTO(out, rc =-EFAULT);
204
205         /* The log records are swabbed as they are processed */
206         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
207         if (ptr == NULL)
208                 GOTO(out, rc =-EFAULT);
209
210         *cur_idx = body->lgd_saved_index;
211         *cur_offset = body->lgd_cur_offset;
212
213         memcpy(buf, ptr, len);
214         EXIT;
215 out:
216         ptlrpc_req_finished(req);
217 err_exit:
218         LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp);
219         return rc;
220 }
221
222 static int llog_client_prev_block(struct llog_handle *loghandle,
223                                   int prev_idx, void *buf, int len)
224 {
225         struct obd_import     *imp;
226         struct ptlrpc_request *req = NULL;
227         struct llogd_body     *body;
228         void                  *ptr;
229         int                    rc;
230         ENTRY;
231
232         LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp);
233         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK,
234                                         LUSTRE_LOG_VERSION,
235                                         LLOG_ORIGIN_HANDLE_PREV_BLOCK);
236         if (req == NULL)
237                 GOTO(err_exit, rc = -ENOMEM);
238
239         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
240         body->lgd_logid = loghandle->lgh_id;
241         body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
242         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
243         body->lgd_index = prev_idx;
244         body->lgd_len = len;
245
246         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, len);
247         ptlrpc_request_set_replen(req);
248
249         rc = ptlrpc_queue_wait(req);
250         if (rc)
251                 GOTO(out, rc);
252
253         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
254         if (body == NULL)
255                 GOTO(out, rc =-EFAULT);
256
257         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
258         if (ptr == NULL)
259                 GOTO(out, rc =-EFAULT);
260
261         memcpy(buf, ptr, len);
262         EXIT;
263 out:
264         ptlrpc_req_finished(req);
265 err_exit:
266         LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp);
267         return rc;
268 }
269
270 static int llog_client_read_header(struct llog_handle *handle)
271 {
272         struct obd_import     *imp;
273         struct ptlrpc_request *req = NULL;
274         struct llogd_body     *body;
275         struct llog_log_hdr   *hdr;
276         struct llog_rec_hdr   *llh_hdr;
277         int                    rc;
278         ENTRY;
279
280         LLOG_CLIENT_ENTRY(handle->lgh_ctxt, imp);
281         req = ptlrpc_request_alloc_pack(imp,&RQF_LLOG_ORIGIN_HANDLE_READ_HEADER,
282                                         LUSTRE_LOG_VERSION,
283                                         LLOG_ORIGIN_HANDLE_READ_HEADER);
284         if (req == NULL)
285                 GOTO(err_exit, rc = -ENOMEM);
286
287         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
288         body->lgd_logid = handle->lgh_id;
289         body->lgd_ctxt_idx = handle->lgh_ctxt->loc_idx - 1;
290         body->lgd_llh_flags = handle->lgh_hdr->llh_flags;
291
292         ptlrpc_request_set_replen(req);
293         rc = ptlrpc_queue_wait(req);
294         if (rc)
295                 GOTO(out, rc);
296
297         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
298         if (hdr == NULL)
299                 GOTO(out, rc =-EFAULT);
300
301         memcpy(handle->lgh_hdr, hdr, sizeof (*hdr));
302         handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
303
304         /* sanity checks */
305         llh_hdr = &handle->lgh_hdr->llh_hdr;
306         if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
307                 CERROR("bad log header magic: %#x (expecting %#x)\n",
308                        llh_hdr->lrh_type, LLOG_HDR_MAGIC);
309                 rc = -EIO;
310         } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
311                 CERROR("incorrectly sized log header: %#x "
312                        "(expecting %#x)\n",
313                        llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
314                 CERROR("you may need to re-run lconf --write_conf.\n");
315                 rc = -EIO;
316         }
317         EXIT;
318 out:
319         ptlrpc_req_finished(req);
320 err_exit:
321         LLOG_CLIENT_EXIT(handle->lgh_ctxt, imp);
322         return rc;
323 }
324
325 static int llog_client_close(struct llog_handle *handle)
326 {
327         /* this doesn't call LLOG_ORIGIN_HANDLE_CLOSE because
328            the servers all close the file at the end of every
329            other LLOG_ RPC. */
330         return(0);
331 }
332
333
334 struct llog_operations llog_client_ops = {
335         lop_next_block:  llog_client_next_block,
336         lop_prev_block:  llog_client_prev_block,
337         lop_read_header: llog_client_read_header,
338         lop_create:      llog_client_create,
339         lop_destroy:     llog_client_destroy,
340         lop_close:       llog_client_close,
341 };