0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #define _GNU_SOURCE
0011 #include <fcntl.h>
0012 #include <stdio.h>
0013 #include <stdlib.h>
0014 #include <unistd.h>
0015 #include <sys/syscall.h>
0016 #include "trace-agent.h"
0017
0018 #define READ_WAIT_USEC 100000
0019
0020 void *rw_thread_info_new(void)
0021 {
0022 struct rw_thread_info *rw_ti;
0023
0024 rw_ti = zalloc(sizeof(struct rw_thread_info));
0025 if (rw_ti == NULL) {
0026 pr_err("rw_thread_info zalloc error\n");
0027 exit(EXIT_FAILURE);
0028 }
0029
0030 rw_ti->cpu_num = -1;
0031 rw_ti->in_fd = -1;
0032 rw_ti->out_fd = -1;
0033 rw_ti->read_pipe = -1;
0034 rw_ti->write_pipe = -1;
0035 rw_ti->pipe_size = PIPE_INIT;
0036
0037 return rw_ti;
0038 }
0039
0040 void *rw_thread_init(int cpu, const char *in_path, const char *out_path,
0041 bool stdout_flag, unsigned long pipe_size,
0042 struct rw_thread_info *rw_ti)
0043 {
0044 int data_pipe[2];
0045
0046 rw_ti->cpu_num = cpu;
0047
0048
0049 rw_ti->in_fd = open(in_path, O_RDONLY);
0050 if (rw_ti->in_fd == -1) {
0051 pr_err("Could not open in_fd (CPU:%d)\n", cpu);
0052 goto error;
0053 }
0054
0055
0056 if (!stdout_flag) {
0057
0058 rw_ti->out_fd = open(out_path, O_WRONLY);
0059 if (rw_ti->out_fd == -1) {
0060 pr_err("Could not open out_fd (CPU:%d)\n", cpu);
0061 goto error;
0062 }
0063 } else
0064
0065 rw_ti->out_fd = STDOUT_FILENO;
0066
0067 if (pipe2(data_pipe, O_NONBLOCK) < 0) {
0068 pr_err("Could not create pipe in rw-thread(%d)\n", cpu);
0069 goto error;
0070 }
0071
0072
0073
0074
0075
0076 if (fcntl(*data_pipe, F_SETPIPE_SZ, pipe_size) < 0) {
0077 pr_err("Could not change pipe size in rw-thread(%d)\n", cpu);
0078 goto error;
0079 }
0080
0081 rw_ti->read_pipe = data_pipe[1];
0082 rw_ti->write_pipe = data_pipe[0];
0083 rw_ti->pipe_size = pipe_size;
0084
0085 return NULL;
0086
0087 error:
0088 exit(EXIT_FAILURE);
0089 }
0090
0091
0092 static void bind_cpu(int cpu_num)
0093 {
0094 cpu_set_t mask;
0095
0096 CPU_ZERO(&mask);
0097 CPU_SET(cpu_num, &mask);
0098
0099
0100 if (sched_setaffinity(0, sizeof(mask), &mask) == -1)
0101 pr_err("Could not set CPU#%d affinity\n", (int)cpu_num);
0102 }
0103
0104 static void *rw_thread_main(void *thread_info)
0105 {
0106 ssize_t rlen, wlen;
0107 ssize_t ret;
0108 struct rw_thread_info *ts = (struct rw_thread_info *)thread_info;
0109
0110 bind_cpu(ts->cpu_num);
0111
0112 while (1) {
0113
0114 if (!global_run_operation) {
0115 pthread_mutex_lock(&mutex_notify);
0116 pthread_cond_wait(&cond_wakeup, &mutex_notify);
0117 pthread_mutex_unlock(&mutex_notify);
0118 }
0119
0120 if (global_sig_receive)
0121 break;
0122
0123
0124
0125
0126
0127 rlen = splice(ts->in_fd, NULL, ts->read_pipe, NULL,
0128 ts->pipe_size, SPLICE_F_MOVE | SPLICE_F_MORE);
0129
0130 if (rlen < 0) {
0131 pr_err("Splice_read in rw-thread(%d)\n", ts->cpu_num);
0132 goto error;
0133 } else if (rlen == 0) {
0134
0135
0136
0137
0138
0139
0140 usleep(READ_WAIT_USEC);
0141 pr_debug("Read retry(cpu:%d)\n", ts->cpu_num);
0142 continue;
0143 }
0144
0145 wlen = 0;
0146
0147 do {
0148 ret = splice(ts->write_pipe, NULL, ts->out_fd, NULL,
0149 rlen - wlen,
0150 SPLICE_F_MOVE | SPLICE_F_MORE);
0151
0152 if (ret < 0) {
0153 pr_err("Splice_write in rw-thread(%d)\n",
0154 ts->cpu_num);
0155 goto error;
0156 } else if (ret == 0)
0157
0158
0159
0160
0161
0162
0163
0164
0165
0166 sleep(1);
0167 wlen += ret;
0168 } while (wlen < rlen);
0169 }
0170
0171 return NULL;
0172
0173 error:
0174 exit(EXIT_FAILURE);
0175 }
0176
0177
0178 pthread_t rw_thread_run(struct rw_thread_info *rw_ti)
0179 {
0180 int ret;
0181 pthread_t rw_thread_per_cpu;
0182
0183 ret = pthread_create(&rw_thread_per_cpu, NULL, rw_thread_main, rw_ti);
0184 if (ret != 0) {
0185 pr_err("Could not create a rw thread(%d)\n", rw_ti->cpu_num);
0186 exit(EXIT_FAILURE);
0187 }
0188
0189 return rw_thread_per_cpu;
0190 }