00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "config.h"
00019
00020 #include <assert_pp.h>
00021 #include <time_util.h>
00022 #include <instrumentation.h>
00023
00024 #include <iostream>
00025
00026 #include <rk.h>
00027 #include <rk_util.h>
00028
00029 #include "RKObserverAdvocate.hh"
00030
00031 using namespace std;
00032
00033 #if !defined(timespecclear)
00034
00035
00036
00037
00038
00039 #define timespecclear(tvp) ((tvp)->tv_sec = (tvp)->tv_nsec = 0)
00040
00041
00042
00043
00044
00045
00046 #define timespecisset(tvp) ((tvp)->tv_sec || (tvp)->tv_nsec)
00047
00048
00049
00050
00051
00052
00053
00054
00055 #define timespeccmp(tvp, uvp, cmp) \
00056 (((tvp)->tv_sec == (uvp)->tv_sec) ? \
00057 ((tvp)->tv_nsec cmp (uvp)->tv_nsec) : \
00058 ((tvp)->tv_sec cmp (uvp)->tv_sec))
00059
00060
00061
00062
00063
00064
00065
00066 #define timespecadd(vvp, uvp) \
00067 do { \
00068 (vvp)->tv_sec += (uvp)->tv_sec; \
00069 (vvp)->tv_nsec += (uvp)->tv_nsec; \
00070 if ((vvp)->tv_nsec >= 1000000000) { \
00071 (vvp)->tv_sec++; \
00072 (vvp)->tv_nsec -= 1000000000; \
00073 } \
00074 } while (0)
00075
00076
00077
00078
00079
00080
00081
00082 #define timespecsub(vvp, uvp) \
00083 do { \
00084 (vvp)->tv_sec -= (uvp)->tv_sec; \
00085 (vvp)->tv_nsec -= (uvp)->tv_nsec; \
00086 if ((vvp)->tv_nsec < 0) { \
00087 (vvp)->tv_sec--; \
00088 (vvp)->tv_nsec += 1000000000; \
00089 } \
00090 } while (0)
00091 #endif
00092
00093
00094
00095
00096
00097
00098
00099
00100 static void *trampoline(void *arg)
00101 {
00102 RKObserverAdvocate *rkoa = (RKObserverAdvocate *)arg;
00103 void *retval = NULL;
00104
00105 require(arg != NULL);
00106
00107 rkoa->run();
00108
00109 return( retval );
00110 }
00111
00112 RKObserverAdvocate::RKObserverAdvocate(void)
00113 throw (CORBA::SystemException)
00114 {
00115 if( pthread_mutex_init(&this->rkoa_Mutex, NULL) != 0 )
00116 {
00117 ensure(0);
00118 }
00119 if( pthread_cond_init(&this->rkoa_Cond, NULL) != 0 )
00120 {
00121 ensure(0);
00122 }
00123 this->rkoa_ResourceSet = NULL_RESOURCE_SET;
00124 this->rkoa_Period = microsec_to_timespec(10000);
00125 this->rkoa_Thread = 0;
00126 }
00127
00128 RKObserverAdvocate::~RKObserverAdvocate(void)
00129 {
00130 this->rkoa_Thread = 0;
00131 if( pthread_mutex_destroy(&this->rkoa_Mutex) != 0 )
00132 {
00133 ensure(0);
00134 }
00135 if( pthread_cond_destroy(&this->rkoa_Cond) != 0 )
00136 {
00137 ensure(0);
00138 }
00139 }
00140
00141 void
00142 RKObserverAdvocate::BeginCPUScheduling(const Broker::ScheduleParameters &sp)
00143 throw (CORBA::SystemException,
00144 Broker::DuplicateScheduleParameter,
00145 Broker::InvalidScheduleParameter,
00146 Broker::MissingScheduleParameter)
00147 {
00148 unsigned long long period = ~0;
00149 unsigned int lpc;
00150 const char *str;
00151 pid_t pid = -1;
00152
00153 if( CORBA::is_nil(this->dm_RemoteObject.in()) )
00154 {
00155 throw CORBA::BAD_INV_ORDER();
00156 }
00157
00158 if( this->rkoa_ResourceSet != NULL_RESOURCE_SET )
00159 {
00160 throw CORBA::BAD_INV_ORDER();
00161 }
00162
00163
00164 for( lpc = 0; lpc < sp.length(); lpc++ )
00165 {
00166 if( strcasecmp(sp[lpc].name.in(), "pid") == 0 )
00167 {
00168 CORBA::Long c_pid;
00169 const char *str;
00170
00171 if( pid != -1 )
00172 {
00173 throw Broker::DuplicateScheduleParameter("pid");
00174 }
00175 else if( sp[lpc].value >>= c_pid )
00176 {
00177 pid = c_pid;
00178 }
00179 else if( (sp[lpc].value >>= str) &&
00180 (sscanf(str, "%d", &pid) == 1) )
00181 {
00182 }
00183 else
00184 {
00185 throw Broker::InvalidScheduleParameter("'pid' not a long",
00186 sp[lpc]);
00187 }
00188 if( pid < 0 )
00189 {
00190 pid = -1;
00191 throw Broker::InvalidScheduleParameter("'pid' not a long",
00192 sp[lpc]);
00193 }
00194 }
00195 else if( strcasecmp(sp[lpc].name.in(), "period") == 0 )
00196 {
00197 CORBA::ULong period_ul;
00198
00199 if( period != ~0ULL )
00200 {
00201 throw Broker::DuplicateScheduleParameter("period");
00202 }
00203 else if( sp[lpc].value >>= period_ul )
00204 {
00205 period = period_ul;
00206 }
00207 else if( (sp[lpc].value >>= str) &&
00208 string_to_microsec(&period, str) )
00209 {
00210 }
00211 else
00212 {
00213 throw Broker::InvalidScheduleParameter(
00214 "'period' is not a time",
00215 sp[lpc]);
00216 }
00217 }
00218 }
00219
00220 if( pid < -1 )
00221 {
00222 throw CORBA::BAD_PARAM();
00223 }
00224
00225 this->dm_RemoteObject->BeginCPUScheduling(sp);
00226
00227 this->rkoa_Period = microsec_to_timespec(period);
00228
00229 this->rkoa_Advocate = Broker::RealTimeTask::_duplicate(this->_this());
00230
00231 {
00232 this->rkoa_ResourceSet = rk_proc_get_rset(pid);
00233
00234 ensure(this->rkoa_ResourceSet != NULL_RESOURCE_SET);
00235 }
00236
00237 if( pthread_create(&this->rkoa_Thread, NULL, trampoline, this) != 0 )
00238 {
00239 this->dm_RemoteObject->EndCPUScheduling();
00240 throw CORBA::NO_MEMORY();
00241 }
00242 }
00243
00244 void RKObserverAdvocate::EndCPUScheduling(void)
00245 throw (CORBA::SystemException)
00246 {
00247 pthread_t pt;
00248 void *rc;
00249
00250 if( pthread_mutex_lock(&this->rkoa_Mutex) != 0 )
00251 {
00252 ensure(0);
00253 }
00254 {
00255 this->rkoa_ResourceSet = NULL_RESOURCE_SET;
00256 pt = this->rkoa_Thread;
00257 this->rkoa_Thread = 0;
00258 if( pthread_cond_signal(&this->rkoa_Cond) != 0 )
00259 {
00260 ensure(0);
00261 }
00262 }
00263 if( pthread_mutex_unlock(&this->rkoa_Mutex) != 0 )
00264 {
00265 ensure(0);
00266 }
00267 if( pthread_join(pt, &rc) != 0 )
00268 {
00269 ensure(0);
00270 }
00271
00272 this->dm_RemoteObject->EndCPUScheduling();
00273 }
00274
00275 void RKObserverAdvocate::run(void)
00276 {
00277 struct rk_resource_set_proc proc_elements[128];
00278 struct rk_resource_set_proc_cache pc;
00279 Broker::KeyedReportParameters krp;
00280 unsigned long long last_usage = 0;
00281 struct rk_resource_set_usage rsu;
00282 struct timespec abstime;
00283 pid_t my_pid;
00284 int rc;
00285
00286 pc.data = proc_elements;
00287 pc.used = 0;
00288 pc.length = 128;
00289 memset(&rsu, 0, sizeof(rsu));
00290 my_pid = getpid();
00291 if( rk_resource_set_detach_process(rk_proc_get_rset(my_pid),
00292 my_pid) == -1 )
00293 {
00294 perror("rk_resource_set_detach_process");
00295 }
00296 if( rk_resource_set_attach_process(this->rkoa_ResourceSet,
00297 my_pid) == -1 )
00298 {
00299 perror("rk_resource_set_attach_process");
00300 }
00301 clock_gettime(CLOCK_REALTIME, &abstime);
00302 if( (rc = pthread_mutex_lock(&this->rkoa_Mutex)) != 0 )
00303 {
00304 cerr << "pthread_mutex_lock: " << rc << endl;
00305 ensure(0);
00306 }
00307 do {
00308 if( this->rkoa_ResourceSet == NULL_RESOURCE_SET )
00309 {
00310
00311 }
00312 else
00313 {
00314 int rc;
00315
00316 if( (rc = rk_resource_set_get_usage(this->rkoa_ResourceSet,
00317 &rsu,
00318 &pc)) != 0 )
00319 {
00320 cerr << "rk_resource_set_get_usage: "
00321 << strerror(rc)
00322 << endl;
00323 }
00324 else
00325 {
00326 try
00327 {
00328 unsigned long long total_usage;
00329 Broker::CPUReserve status;
00330
00331 total_usage = rsu.inactive_cpu_usage +
00332 rsu.active_cpu_usage;
00333 status.Period = timespec_to_microsec(&this->rkoa_Period);
00334 status.Compute = (CORBA::ULong)
00335 (total_usage - last_usage) + 50;
00336
00337
00338
00339
00340
00341 this->PassCPU(this->rkoa_Advocate, status, status, krp);
00342 last_usage = total_usage;
00343 }
00344 catch(const CORBA::SystemException &e)
00345 {
00346 cerr << "System exception: " << e << endl;
00347 }
00348 catch(...)
00349 {
00350 cerr << "Caught unknown exception..." << endl;
00351 }
00352 }
00353 timespecadd(&abstime, &this->rkoa_Period);
00354 rc = pthread_cond_timedwait(&this->rkoa_Cond,
00355 &this->rkoa_Mutex,
00356 &abstime);
00357 switch( rc )
00358 {
00359 case 0:
00360
00361 break;
00362 case ETIMEDOUT:
00363
00364 break;
00365 default:
00366 cerr << "pthread_cond_timedwait: " << rc << endl;
00367 ensure(0);
00368 break;
00369 }
00370 }
00371 } while( this->rkoa_Thread != 0 );
00372 if( pthread_mutex_unlock(&this->rkoa_Mutex) != 0 )
00373 {
00374 ensure(0);
00375 }
00376 rk_resource_set_proc_cache_release(&pc);
00377 }