Page Menu
Home
GnuPG
Search
Configure Global Search
Log In
Files
F36623055
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Size
51 KB
Subscribers
None
View Options
diff --git a/scripts/daemon/exec/exec_daemon.php b/scripts/daemon/exec/exec_daemon.php
index ab74112..2cb206e 100755
--- a/scripts/daemon/exec/exec_daemon.php
+++ b/scripts/daemon/exec/exec_daemon.php
@@ -1,125 +1,127 @@
#!/usr/bin/env php
<?php
+declare(ticks = 1);
+
require_once dirname(__FILE__).'/../../__init_script__.php';
if (!posix_isatty(STDOUT)) {
$sid = posix_setsid();
if ($sid <= 0) {
throw new Exception(pht('Failed to create new process session!'));
}
}
$args = new PhutilArgumentParser($argv);
$args->setTagline(pht('daemon executor'));
$args->setSynopsis(<<<EOHELP
**exec_daemon.php** [__options__] __daemon__ ...
Run an instance of __daemon__.
EOHELP
);
$args->parse(
array(
array(
'name' => 'trace',
'help' => pht('Enable debug tracing.'),
),
array(
'name' => 'trace-memory',
'help' => pht('Enable debug memory tracing.'),
),
array(
'name' => 'verbose',
'help' => pht('Enable verbose activity logging.'),
),
array(
'name' => 'label',
'short' => 'l',
'param' => 'label',
'help' => pht(
'Optional process label. Makes "%s" nicer, no behavioral effects.',
'ps'),
),
array(
'name' => 'daemon',
'wildcard' => true,
),
));
$trace_memory = $args->getArg('trace-memory');
$trace_mode = $args->getArg('trace') || $trace_memory;
$verbose = $args->getArg('verbose');
if (function_exists('posix_isatty') && posix_isatty(STDIN)) {
fprintf(STDERR, pht('Reading daemon configuration from stdin...')."\n");
}
$config = @file_get_contents('php://stdin');
$config = id(new PhutilJSONParser())->parse($config);
PhutilTypeSpec::checkMap(
$config,
array(
'log' => 'optional string|null',
'argv' => 'optional list<wild>',
'load' => 'optional list<string>',
'autoscale' => 'optional wild',
));
$log = idx($config, 'log');
if ($log) {
ini_set('error_log', $log);
PhutilErrorHandler::setErrorListener(array('PhutilDaemon', 'errorListener'));
}
$load = idx($config, 'load', array());
foreach ($load as $library) {
$library = Filesystem::resolvePath($library);
phutil_load_library($library);
}
PhutilErrorHandler::initialize();
$daemon = $args->getArg('daemon');
if (!$daemon) {
throw new PhutilArgumentUsageException(
pht('Specify which class of daemon to start.'));
} else if (count($daemon) > 1) {
throw new PhutilArgumentUsageException(
pht('Specify exactly one daemon to start.'));
} else {
$daemon = head($daemon);
if (!class_exists($daemon)) {
throw new PhutilArgumentUsageException(
pht(
'No class "%s" exists in any known library.',
$daemon));
} else if (!is_subclass_of($daemon, 'PhutilDaemon')) {
throw new PhutilArgumentUsageException(
pht(
'Class "%s" is not a subclass of "%s".',
$daemon,
'PhutilDaemon'));
}
}
$argv = idx($config, 'argv', array());
$daemon = newv($daemon, array($argv));
if ($trace_mode) {
$daemon->setTraceMode();
}
if ($trace_memory) {
$daemon->setTraceMemory();
}
if ($verbose) {
$daemon->setVerbose(true);
}
$autoscale = idx($config, 'autoscale');
if ($autoscale) {
$daemon->setAutoscaleProperties($autoscale);
}
$daemon->execute();
diff --git a/src/daemon/PhutilDaemon.php b/src/daemon/PhutilDaemon.php
index 4d7afe6..43a6c3f 100644
--- a/src/daemon/PhutilDaemon.php
+++ b/src/daemon/PhutilDaemon.php
@@ -1,396 +1,410 @@
<?php
/**
* Scaffolding for implementing robust background processing scripts.
*
*
* Autoscaling
* ===========
*
* Autoscaling automatically launches copies of a daemon when it is busy
* (scaling the pool up) and stops them when they're idle (scaling the pool
* down). This is appropriate for daemons which perform highly parallelizable
* work.
*
* To make a daemon support autoscaling, the implementation should look
* something like this:
*
* while (!$this->shouldExit()) {
* if (work_available()) {
* $this->willBeginWork();
* do_work();
* $this->sleep(0);
* } else {
* $this->willBeginIdle();
* $this->sleep(1);
* }
* }
*
* In particular, call @{method:willBeginWork} before becoming busy, and
* @{method:willBeginIdle} when no work is available. If the daemon is launched
* into an autoscale pool, this will cause the pool to automatically scale up
* when busy and down when idle.
*
* See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple
* autoscaling daemon.
*
* Launching a daemon which does not make these callbacks into an autoscale
* pool will have no effect.
*
* @task overseer Communicating With the Overseer
* @task autoscale Autoscaling Daemon Pools
*/
abstract class PhutilDaemon extends Phobject {
const MESSAGETYPE_STDOUT = 'stdout';
const MESSAGETYPE_HEARTBEAT = 'heartbeat';
const MESSAGETYPE_BUSY = 'busy';
const MESSAGETYPE_IDLE = 'idle';
const MESSAGETYPE_DOWN = 'down';
const WORKSTATE_BUSY = 'busy';
const WORKSTATE_IDLE = 'idle';
private $argv;
private $traceMode;
private $traceMemory;
private $verbose;
private $notifyReceived;
private $inGracefulShutdown;
private $workState = null;
private $idleSince = null;
private $autoscaleProperties = array();
final public function setVerbose($verbose) {
$this->verbose = $verbose;
return $this;
}
final public function getVerbose() {
return $this->verbose;
}
private static $sighandlerInstalled;
final public function __construct(array $argv) {
- declare(ticks = 1);
$this->argv = $argv;
if (!self::$sighandlerInstalled) {
self::$sighandlerInstalled = true;
pcntl_signal(SIGTERM, __CLASS__.'::exitOnSignal');
}
- pcntl_signal(SIGINT, array($this, 'onGracefulSignal'));
+ pcntl_signal(SIGINT, array($this, 'onGracefulSignal'));
pcntl_signal(SIGUSR2, array($this, 'onNotifySignal'));
// Without discard mode, this consumes unbounded amounts of memory. Keep
// memory bounded.
PhutilServiceProfiler::getInstance()->enableDiscardMode();
$this->beginStdoutCapture();
}
final public function __destruct() {
$this->endStdoutCapture();
}
final public function stillWorking() {
$this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null);
if ($this->traceMemory) {
$daemon = get_class($this);
fprintf(
STDERR,
- "<%s> %s %s\n",
+ "%s %s %s\n",
'<RAMS>',
$daemon,
pht(
'Memory Usage: %s KB',
new PhutilNumber(memory_get_usage() / 1024, 1)));
}
}
final public function shouldExit() {
return $this->inGracefulShutdown;
}
final protected function sleep($duration) {
$this->notifyReceived = false;
$this->willSleep($duration);
$this->stillWorking();
$is_autoscale = $this->isClonedAutoscaleDaemon();
$scale_down = $this->getAutoscaleDownDuration();
$max_sleep = 60;
if ($is_autoscale) {
$max_sleep = min($max_sleep, $scale_down);
}
if ($is_autoscale) {
if ($this->workState == self::WORKSTATE_IDLE) {
$dur = (time() - $this->idleSince);
$this->log(pht('Idle for %s seconds.', $dur));
}
}
while ($duration > 0 &&
!$this->notifyReceived &&
!$this->shouldExit()) {
// If this is an autoscaling clone and we've been idle for too long,
// we're going to scale the pool down by exiting and not restarting. The
// DOWN message tells the overseer that we don't want to be restarted.
if ($is_autoscale) {
if ($this->workState == self::WORKSTATE_IDLE) {
if ($this->idleSince && ($this->idleSince + $scale_down < time())) {
$this->inGracefulShutdown = true;
$this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null);
$this->log(
pht(
'Daemon was idle for more than %s second(s), '.
'scaling pool down.',
new PhutilNumber($scale_down)));
break;
}
}
}
sleep(min($duration, $max_sleep));
$duration -= $max_sleep;
$this->stillWorking();
}
}
protected function willSleep($duration) {
return;
}
public static function exitOnSignal($signo) {
- // Normally, PHP doesn't invoke destructors when existing in response to
+ self::didCatchSignal($signo);
+
+ // Normally, PHP doesn't invoke destructors when exiting in response to
// a signal. This forces it to do so, so we have a fighting chance of
// releasing any locks, leases or resources on our way out.
exit(128 + $signo);
}
final protected function getArgv() {
return $this->argv;
}
final public function execute() {
$this->willRun();
$this->run();
}
abstract protected function run();
final public function setTraceMemory() {
$this->traceMemory = true;
return $this;
}
final public function getTraceMemory() {
return $this->traceMemory;
}
final public function setTraceMode() {
$this->traceMode = true;
PhutilServiceProfiler::installEchoListener();
PhutilConsole::getConsole()->getServer()->setEnableLog(true);
$this->didSetTraceMode();
return $this;
}
final public function getTraceMode() {
return $this->traceMode;
}
final public function onGracefulSignal($signo) {
+ self::didCatchSignal($signo);
$this->inGracefulShutdown = true;
}
final public function onNotifySignal($signo) {
+ self::didCatchSignal($signo);
$this->notifyReceived = true;
$this->onNotify($signo);
}
protected function onNotify($signo) {
// This is a hook for subclasses.
}
protected function willRun() {
// This is a hook for subclasses.
}
protected function didSetTraceMode() {
// This is a hook for subclasses.
}
final protected function log($message) {
if ($this->verbose) {
$daemon = get_class($this);
- fprintf(STDERR, "<%s> %s %s\n", '<VERB>', $daemon, $message);
+ fprintf(STDERR, "%s %s %s\n", '<VERB>', $daemon, $message);
}
}
+ private static function didCatchSignal($signo) {
+ $signame = phutil_get_signal_name($signo);
+ fprintf(
+ STDERR,
+ "%s Caught signal %s (%s).\n",
+ '<SGNL>',
+ $signo,
+ $signame);
+ }
+
/* -( Communicating With the Overseer )------------------------------------ */
private function beginStdoutCapture() {
ob_start(array($this, 'didReceiveStdout'), 2);
}
private function endStdoutCapture() {
ob_end_flush();
}
public function didReceiveStdout($data) {
if (!strlen($data)) {
return '';
}
+
return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data);
}
private function encodeOverseerMessage($type, $data) {
$structure = array($type);
if ($data !== null) {
$structure[] = $data;
}
return json_encode($structure)."\n";
}
private function emitOverseerMessage($type, $data) {
$this->endStdoutCapture();
echo $this->encodeOverseerMessage($type, $data);
$this->beginStdoutCapture();
}
public static function errorListener($event, $value, array $metadata) {
// If the caller has redirected the error log to a file, PHP won't output
// messages to stderr, so the overseer can't capture them. Install a
// listener which just echoes errors to stderr, so the overseer is always
// aware of errors.
$console = PhutilConsole::getConsole();
$message = idx($metadata, 'default_message');
if ($message) {
$console->writeErr("%s\n", $message);
}
if (idx($metadata, 'trace')) {
$trace = PhutilErrorHandler::formatStacktrace($metadata['trace']);
$console->writeErr("%s\n", $trace);
}
}
/* -( Autoscaling )-------------------------------------------------------- */
/**
* Prepare to become busy. This may autoscale the pool up.
*
* This notifies the overseer that the daemon has become busy. If daemons
* that are part of an autoscale pool are continuously busy for a prolonged
* period of time, the overseer may scale up the pool.
*
* @return this
* @task autoscale
*/
protected function willBeginWork() {
if ($this->workState != self::WORKSTATE_BUSY) {
$this->workState = self::WORKSTATE_BUSY;
$this->idleSince = null;
$this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null);
}
return $this;
}
/**
* Prepare to idle. This may autoscale the pool down.
*
* This notifies the overseer that the daemon is no longer busy. If daemons
* that are part of an autoscale pool are idle for a prolonged period of time,
* they may exit to scale the pool down.
*
* @return this
* @task autoscale
*/
protected function willBeginIdle() {
if ($this->workState != self::WORKSTATE_IDLE) {
$this->workState = self::WORKSTATE_IDLE;
$this->idleSince = time();
$this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null);
}
return $this;
}
/**
* Determine if this is a clone or the original daemon.
*
* @return bool True if this is an cloned autoscaling daemon.
* @task autoscale
*/
private function isClonedAutoscaleDaemon() {
return (bool)$this->getAutoscaleProperty('clone', false);
}
/**
* Get the duration (in seconds) which a daemon must be continuously idle
* for before it should exit to scale the pool down.
*
* @return int Duration, in seconds.
* @task autoscale
*/
private function getAutoscaleDownDuration() {
return $this->getAutoscaleProperty('down', 15);
}
/**
* Configure autoscaling for this daemon.
*
* @param map<string, wild> Map of autoscale properties.
* @return this
* @task autoscale
*/
public function setAutoscaleProperties(array $autoscale_properties) {
PhutilTypeSpec::checkMap(
$autoscale_properties,
array(
'group' => 'optional string',
'up' => 'optional int',
'down' => 'optional int',
'pool' => 'optional int',
'clone' => 'optional bool',
'reserve' => 'optional int|float',
));
$this->autoscaleProperties = $autoscale_properties;
return $this;
}
/**
* Read autoscaling configuration for this daemon.
*
* @param string Property to read.
* @param wild Default value to return if the property is not set.
* @return wild Property value, or `$default` if one is not set.
* @task autoscale
*/
private function getAutoscaleProperty($key, $default = null) {
return idx($this->autoscaleProperties, $key, $default);
}
}
diff --git a/src/daemon/PhutilDaemonHandle.php b/src/daemon/PhutilDaemonHandle.php
index 37858fb..3de873c 100644
--- a/src/daemon/PhutilDaemonHandle.php
+++ b/src/daemon/PhutilDaemonHandle.php
@@ -1,419 +1,422 @@
<?php
final class PhutilDaemonHandle extends Phobject {
const EVENT_DID_LAUNCH = 'daemon.didLaunch';
const EVENT_DID_LOG = 'daemon.didLogMessage';
const EVENT_DID_HEARTBEAT = 'daemon.didHeartbeat';
const EVENT_WILL_GRACEFUL = 'daemon.willGraceful';
const EVENT_WILL_EXIT = 'daemon.willExit';
private $overseer;
private $daemonClass;
private $argv;
private $config;
private $pid;
private $daemonID;
private $deadline;
private $heartbeat;
private $stdoutBuffer;
private $restartAt;
private $silent;
private $shouldRestart = true;
private $shouldShutdown;
private $future;
private $traceMemory;
public function __construct(
PhutilDaemonOverseer $overseer,
$daemon_class,
array $argv,
array $config) {
$this->overseer = $overseer;
$this->daemonClass = $daemon_class;
$this->argv = $argv;
$this->config = $config;
$this->restartAt = time();
$this->daemonID = $this->generateDaemonID();
$this->dispatchEvent(
self::EVENT_DID_LAUNCH,
array(
'argv' => $this->argv,
'explicitArgv' => idx($this->config, 'argv'),
));
}
public function isRunning() {
return (bool)$this->future;
}
public function isDone() {
return (!$this->shouldRestart && !$this->isRunning());
}
public function getFuture() {
return $this->future;
}
public function setSilent($silent) {
$this->silent = $silent;
return $this;
}
public function getSilent() {
return $this->silent;
}
public function setTraceMemory($trace_memory) {
$this->traceMemory = $trace_memory;
return $this;
}
public function getTraceMemory() {
return $this->traceMemory;
}
public function update() {
$this->updateMemory();
if (!$this->isRunning()) {
if (!$this->shouldRestart) {
return;
}
if (!$this->restartAt || (time() < $this->restartAt)) {
return;
}
if ($this->shouldShutdown) {
return;
}
$this->startDaemonProcess();
}
$future = $this->future;
$result = null;
if ($future->isReady()) {
$result = $future->resolve();
}
list($stdout, $stderr) = $future->read();
$future->discardBuffers();
if (strlen($stdout)) {
$this->didReadStdout($stdout);
}
$stderr = trim($stderr);
if (strlen($stderr)) {
- $this->logMessage('STDE', $stderr);
+ foreach (phutil_split_lines($stderr, false) as $line) {
+ $this->logMessage('STDE', $line);
+ }
}
if ($result !== null) {
list($err) = $result;
+
if ($err) {
- $this->logMessage('FAIL', pht('Process exited with error %s', $err));
+ $this->logMessage('FAIL', pht('Process exited with error %s.', $err));
} else {
$this->logMessage('DONE', pht('Process exited normally.'));
}
$this->future = null;
if ($this->shouldShutdown) {
$this->restartAt = null;
} else {
$this->scheduleRestart();
}
}
$this->updateHeartbeatEvent();
$this->updateHangDetection();
}
private function updateHeartbeatEvent() {
if ($this->heartbeat > time()) {
return;
}
$this->heartbeat = time() + $this->getHeartbeatEventFrequency();
$this->dispatchEvent(self::EVENT_DID_HEARTBEAT);
}
private function updateHangDetection() {
if (!$this->isRunning()) {
return;
}
if (time() > $this->deadline) {
$this->logMessage('HANG', pht('Hang detected. Restarting process.'));
$this->annihilateProcessGroup();
$this->scheduleRestart();
}
}
private function scheduleRestart() {
$this->logMessage('WAIT', pht('Waiting to restart process.'));
$this->restartAt = time() + self::getWaitBeforeRestart();
}
/**
* Generate a unique ID for this daemon.
*
* @return string A unique daemon ID.
*/
private function generateDaemonID() {
return substr(getmypid().':'.Filesystem::readRandomCharacters(12), 0, 12);
}
public function getDaemonID() {
return $this->daemonID;
}
public function getPID() {
return $this->pid;
}
private function getCaptureBufferSize() {
return 65535;
}
private function getRequiredHeartbeatFrequency() {
return 86400;
}
public static function getWaitBeforeRestart() {
return 5;
}
public static function getHeartbeatEventFrequency() {
return 120;
}
private function getKillDelay() {
return 3;
}
private function getDaemonCWD() {
$root = dirname(phutil_get_library_root('phutil'));
return $root.'/scripts/daemon/exec/';
}
private function newExecFuture() {
$class = $this->daemonClass;
$argv = $this->argv;
$buffer_size = $this->getCaptureBufferSize();
// NOTE: PHP implements proc_open() by running 'sh -c'. On most systems this
// is bash, but on Ubuntu it's dash. When you proc_open() using bash, you
// get one new process (the command you ran). When you proc_open() using
// dash, you get two new processes: the command you ran and a parent
// "dash -c" (or "sh -c") process. This means that the child process's PID
// is actually the 'dash' PID, not the command's PID. To avoid this, use
// 'exec' to replace the shell process with the real process; without this,
// the child will call posix_getppid(), be given the pid of the 'sh -c'
// process, and send it SIGUSR1 to keepalive which will terminate it
// immediately. We also won't be able to do process group management because
// the shell process won't properly posix_setsid() so the pgid of the child
// won't be meaningful.
return id(new ExecFuture('exec ./exec_daemon.php %s %Ls', $class, $argv))
->setCWD($this->getDaemonCWD())
->setStdoutSizeLimit($buffer_size)
->setStderrSizeLimit($buffer_size)
->write(json_encode($this->config));
}
/**
* Dispatch an event to event listeners.
*
* @param string Event type.
* @param dict Event parameters.
* @return void
*/
private function dispatchEvent($type, array $params = array()) {
$data = array(
'id' => $this->daemonID,
'daemonClass' => $this->daemonClass,
'childPID' => $this->pid,
) + $params;
$event = new PhutilEvent($type, $data);
try {
PhutilEventEngine::dispatchEvent($event);
} catch (Exception $ex) {
phlog($ex);
}
}
private function annihilateProcessGroup() {
$pid = $this->pid;
$pgid = posix_getpgid($pid);
if ($pid && $pgid) {
posix_kill(-$pgid, SIGTERM);
sleep($this->getKillDelay());
posix_kill(-$pgid, SIGKILL);
$this->pid = null;
}
}
private function updateMemory() {
if ($this->traceMemory) {
$this->logMessage(
'RAMS',
pht(
'Overseer Memory Usage: %s KB',
new PhutilNumber(memory_get_usage() / 1024, 1)));
}
}
private function startDaemonProcess() {
$this->logMessage('INIT', pht('Starting process.'));
$this->deadline = time() + $this->getRequiredHeartbeatFrequency();
$this->heartbeat = time() + self::getHeartbeatEventFrequency();
$this->stdoutBuffer = '';
$this->future = $this->newExecFuture();
$this->future->start();
$this->pid = $this->future->getPID();
}
private function didReadStdout($data) {
$this->stdoutBuffer .= $data;
while (true) {
$pos = strpos($this->stdoutBuffer, "\n");
if ($pos === false) {
break;
}
$message = substr($this->stdoutBuffer, 0, $pos);
$this->stdoutBuffer = substr($this->stdoutBuffer, $pos + 1);
try {
$structure = phutil_json_decode($message);
} catch (PhutilJSONParserException $ex) {
$structure = array();
}
switch (idx($structure, 0)) {
case PhutilDaemon::MESSAGETYPE_STDOUT:
$this->logMessage('STDO', idx($structure, 1));
break;
case PhutilDaemon::MESSAGETYPE_HEARTBEAT:
$this->deadline = time() + $this->getRequiredHeartbeatFrequency();
break;
case PhutilDaemon::MESSAGETYPE_BUSY:
$this->overseer->didBeginWork($this);
break;
case PhutilDaemon::MESSAGETYPE_IDLE:
$this->overseer->didBeginIdle($this);
break;
case PhutilDaemon::MESSAGETYPE_DOWN:
// The daemon is exiting because it doesn't have enough work and it
// is trying to scale the pool down. We should not restart it.
$this->shouldRestart = false;
$this->shouldShutdown = true;
break;
default:
// If we can't parse this or it isn't a message we understand, just
// emit the raw message.
$this->logMessage('STDO', pht('<Malformed> %s', $message));
break;
}
}
}
public function didReceiveNotifySignal($signo) {
$pid = $this->pid;
if ($pid) {
posix_kill($pid, $signo);
}
}
public function didReceiveReloadSignal($signo) {
$signame = phutil_get_signal_name($signo);
if ($signame) {
$sigmsg = pht(
'Reloading in response to signal %d (%s).',
$signo,
$signame);
} else {
$sigmsg = pht(
'Reloading in response to signal %d.',
$signo);
}
$this->logMessage('RELO', $sigmsg, $signo);
// This signal means "stop the current process gracefully, then launch
// a new identical process once it exits". This can be used to update
// daemons after code changes (the new processes will run the new code)
// without aborting any running tasks.
// We SIGINT the daemon but don't set the shutdown flag, so it will
// naturally be restarted after it exits, as though it had exited after an
// unhandled exception.
posix_kill($this->pid, SIGINT);
}
public function didReceiveGracefulSignal($signo) {
$this->shouldShutdown = true;
$this->shouldRestart = false;
$signame = phutil_get_signal_name($signo);
if ($signame) {
$sigmsg = pht(
'Graceful shutdown in response to signal %d (%s).',
$signo,
$signame);
} else {
$sigmsg = pht(
'Graceful shutdown in response to signal %d.',
$signo);
}
$this->logMessage('DONE', $sigmsg, $signo);
posix_kill($this->pid, SIGINT);
}
public function didReceiveTerminalSignal($signo) {
$this->shouldShutdown = true;
$this->shouldRestart = false;
$signame = phutil_get_signal_name($signo);
if ($signame) {
$sigmsg = pht(
'Shutting down in response to signal %s (%s).',
$signo,
$signame);
} else {
$sigmsg = pht('Shutting down in response to signal %s.', $signo);
}
$this->logMessage('EXIT', $sigmsg, $signo);
$this->annihilateProcessGroup();
}
private function logMessage($type, $message, $context = null) {
if (!$this->getSilent()) {
echo date('Y-m-d g:i:s A').' ['.$type.'] '.$message."\n";
}
$this->dispatchEvent(
self::EVENT_DID_LOG,
array(
'type' => $type,
'message' => $message,
'context' => $context,
));
}
public function didRemoveDaemon() {
$this->dispatchEvent(self::EVENT_WILL_EXIT);
}
}
diff --git a/src/future/exec/ExecFuture.php b/src/future/exec/ExecFuture.php
index 23bdb21..11ecca1 100644
--- a/src/future/exec/ExecFuture.php
+++ b/src/future/exec/ExecFuture.php
@@ -1,904 +1,915 @@
<?php
/**
* Execute system commands in parallel using futures.
*
* ExecFuture is a future, which means it runs asynchronously and represents
* a value which may not exist yet. See @{article:Using Futures} for an
* explanation of futures. When an ExecFuture resolves, it returns the exit
* code, stdout and stderr of the process it executed.
*
* ExecFuture is the core command execution implementation in libphutil, but is
* exposed through a number of APIs. See @{article:Command Execution} for more
* discussion about executing system commands.
*
* @task create Creating ExecFutures
* @task resolve Resolving Execution
* @task config Configuring Execution
* @task info Command Information
* @task interact Interacting With Commands
* @task internal Internals
*/
final class ExecFuture extends PhutilExecutableFuture {
private $pipes = array();
private $proc = null;
private $start = null;
private $timeout = null;
private $procStatus = null;
private $stdout = null;
private $stderr = null;
private $stdin = null;
private $closePipe = true;
private $stdoutPos = 0;
private $stderrPos = 0;
private $command = null;
private $readBufferSize;
private $stdoutSizeLimit = PHP_INT_MAX;
private $stderrSizeLimit = PHP_INT_MAX;
private $profilerCallID;
private $killedByTimeout;
private $useWindowsFileStreams = false;
private $windowsStdoutTempFile = null;
private $windowsStderrTempFile = null;
private static $descriptorSpec = array(
0 => array('pipe', 'r'), // stdin
1 => array('pipe', 'w'), // stdout
2 => array('pipe', 'w'), // stderr
);
/* -( Creating ExecFutures )----------------------------------------------- */
/**
* Create a new ExecFuture.
*
* $future = new ExecFuture('wc -l %s', $file_path);
*
* @param string `sprintf()`-style command string which will be passed
* through @{function:csprintf} with the rest of the arguments.
* @param ... Zero or more additional arguments for @{function:csprintf}.
* @return ExecFuture ExecFuture for running the specified command.
* @task create
*/
public function __construct($command) {
$argv = func_get_args();
$this->command = call_user_func_array('csprintf', $argv);
$this->stdin = new PhutilRope();
}
/* -( Command Information )------------------------------------------------ */
/**
* Retrieve the raw command to be executed.
*
* @return string Raw command.
* @task info
*/
public function getCommand() {
return $this->command;
}
/**
* Retrieve the byte limit for the stderr buffer.
*
* @return int Maximum buffer size, in bytes.
* @task info
*/
public function getStderrSizeLimit() {
return $this->stderrSizeLimit;
}
/**
* Retrieve the byte limit for the stdout buffer.
*
* @return int Maximum buffer size, in bytes.
* @task info
*/
public function getStdoutSizeLimit() {
return $this->stdoutSizeLimit;
}
/**
* Get the process's pid. This only works after execution is initiated, e.g.
* by a call to start().
*
* @return int Process ID of the executing process.
* @task info
*/
public function getPID() {
$status = $this->procGetStatus();
return $status['pid'];
}
/* -( Configuring Execution )---------------------------------------------- */
/**
* Set a maximum size for the stdout read buffer. To limit stderr, see
* @{method:setStderrSizeLimit}. The major use of these methods is to use less
* memory if you are running a command which sometimes produces huge volumes
* of output that you don't really care about.
*
* NOTE: Setting this to 0 means "no buffer", not "unlimited buffer".
*
* @param int Maximum size of the stdout read buffer.
* @return this
* @task config
*/
public function setStdoutSizeLimit($limit) {
$this->stdoutSizeLimit = $limit;
return $this;
}
/**
* Set a maximum size for the stderr read buffer.
* See @{method:setStdoutSizeLimit} for discussion.
*
* @param int Maximum size of the stderr read buffer.
* @return this
* @task config
*/
public function setStderrSizeLimit($limit) {
$this->stderrSizeLimit = $limit;
return $this;
}
/**
* Set the maximum internal read buffer size this future. The future will
* block reads once the internal stdout or stderr buffer exceeds this size.
*
* NOTE: If you @{method:resolve} a future with a read buffer limit, you may
* block forever!
*
* TODO: We should probably release the read buffer limit during
* @{method:resolve}, or otherwise detect this. For now, be careful.
*
* @param int|null Maximum buffer size, or `null` for unlimited.
* @return this
*/
public function setReadBufferSize($read_buffer_size) {
$this->readBufferSize = $read_buffer_size;
return $this;
}
/**
* Set whether to use non-blocking streams on Windows.
*
* @param bool Whether to use non-blocking streams.
* @return this
* @task config
*/
public function setUseWindowsFileStreams($use_streams) {
if (phutil_is_windows()) {
$this->useWindowsFileStreams = $use_streams;
}
return $this;
}
/* -( Interacting With Commands )------------------------------------------ */
/**
* Read and return output from stdout and stderr, if any is available. This
* method keeps a read cursor on each stream, but the entire streams are
* still returned when the future resolves. You can call read() again after
* resolving the future to retrieve only the parts of the streams you did not
* previously read:
*
* $future = new ExecFuture('...');
* // ...
* list($stdout) = $future->read(); // Returns output so far
* list($stdout) = $future->read(); // Returns new output since first call
* // ...
* list($stdout) = $future->resolvex(); // Returns ALL output
* list($stdout) = $future->read(); // Returns unread output
*
* NOTE: If you set a limit with @{method:setStdoutSizeLimit} or
* @{method:setStderrSizeLimit}, this method will not be able to read data
* past the limit.
*
* NOTE: If you call @{method:discardBuffers}, all the stdout/stderr data
* will be thrown away and the cursors will be reset.
*
* @return pair <$stdout, $stderr> pair with new output since the last call
* to this method.
* @task interact
*/
public function read() {
$stdout = $this->readStdout();
$result = array(
$stdout,
(string)substr($this->stderr, $this->stderrPos),
);
$this->stderrPos = strlen($this->stderr);
return $result;
}
public function readStdout() {
if ($this->start) {
$this->isReady(); // Sync
}
$result = (string)substr($this->stdout, $this->stdoutPos);
$this->stdoutPos = strlen($this->stdout);
return $result;
}
/**
* Write data to stdin of the command.
*
* @param string Data to write.
* @param bool If true, keep the pipe open for writing. By default, the pipe
* will be closed as soon as possible so that commands which
* listen for EOF will execute. If you want to keep the pipe open
* past the start of command execution, do an empty write with
* `$keep_pipe = true` first.
* @return this
* @task interact
*/
public function write($data, $keep_pipe = false) {
if (strlen($data)) {
if (!$this->stdin) {
throw new Exception(pht('Writing to a closed pipe!'));
}
$this->stdin->append($data);
}
$this->closePipe = !$keep_pipe;
return $this;
}
/**
* Permanently discard the stdout and stderr buffers and reset the read
* cursors. This is basically useful only if you are streaming a large amount
* of data from some process:
*
* $future = new ExecFuture('zcat huge_file.gz');
* do {
* $done = $future->resolve(0.1); // Every 100ms,
* list($stdout) = $future->read(); // read output...
* echo $stdout; // send it somewhere...
* $future->discardBuffers(); // and then free the buffers.
* } while ($done === null);
*
* Conceivably you might also need to do this if you're writing a client using
* @{class:ExecFuture} and `netcat`, but you probably should not do that.
*
* NOTE: This completely discards the data. It won't be available when the
* future resolves. This is almost certainly only useful if you need the
* buffer memory for some reason.
*
* @return this
* @task interact
*/
public function discardBuffers() {
$this->discardStdoutBuffer();
$this->stderr = '';
$this->stderrPos = 0;
return $this;
}
public function discardStdoutBuffer() {
$this->stdout = '';
$this->stdoutPos = 0;
return $this;
}
/**
* Returns true if this future was killed by a timeout configured with
* @{method:setTimeout}.
*
* @return bool True if the future was killed for exceeding its time limit.
*/
public function getWasKilledByTimeout() {
return $this->killedByTimeout;
}
/* -( Configuring Execution )---------------------------------------------- */
/**
* Set a hard limit on execution time. If the command runs longer, it will
* be killed and the future will resolve with an error code. You can test
* if a future was killed by a timeout with @{method:getWasKilledByTimeout}.
*
* @param int Maximum number of seconds this command may execute for.
* @return this
* @task config
*/
public function setTimeout($seconds) {
$this->timeout = $seconds;
return $this;
}
/* -( Resolving Execution )------------------------------------------------ */
/**
* Resolve a command you expect to exit with return code 0. Works like
* @{method:resolve}, but throws if $err is nonempty. Returns only
* $stdout and $stderr. See also @{function:execx}.
*
* list($stdout, $stderr) = $future->resolvex();
*
* @param float Optional timeout after which resolution will pause and
* execution will return to the caller.
* @return pair <$stdout, $stderr> pair.
* @task resolve
*/
public function resolvex($timeout = null) {
list($err, $stdout, $stderr) = $this->resolve($timeout);
if ($err) {
$cmd = $this->command;
throw new CommandException(
pht('Command failed with error #%d!', $err),
$cmd,
$err,
$stdout,
$stderr);
}
return array($stdout, $stderr);
}
/**
* Resolve a command you expect to return valid JSON. Works like
* @{method:resolvex}, but also throws if stderr is nonempty, or stdout is not
* valid JSON. Returns a PHP array, decoded from the JSON command output.
*
* @param float Optional timeout after which resolution will pause and
* execution will return to the caller.
* @return array PHP array, decoded from JSON command output.
* @task resolve
*/
public function resolveJSON($timeout = null) {
list($stdout, $stderr) = $this->resolvex($timeout);
if (strlen($stderr)) {
$cmd = $this->command;
throw new CommandException(
pht(
"JSON command '%s' emitted text to stderr when none was expected: %d",
$cmd,
$stderr),
$cmd,
0,
$stdout,
$stderr);
}
try {
return phutil_json_decode($stdout);
} catch (PhutilJSONParserException $ex) {
$cmd = $this->command;
throw new CommandException(
pht(
"JSON command '%s' did not produce a valid JSON object on stdout: %s",
$cmd,
$stdout),
$cmd,
0,
$stdout,
$stderr);
}
}
/**
* Resolve the process by abruptly terminating it.
*
* @return list List of <err, stdout, stderr> results.
* @task resolve
*/
public function resolveKill() {
if (!$this->result) {
if (defined('SIGKILL')) {
$signal = SIGKILL;
} else {
$signal = 9;
}
proc_terminate($this->proc, $signal);
$this->result = array(
128 + $signal,
$this->stdout,
$this->stderr,
);
$this->closeProcess();
}
return $this->result;
}
/* -( Internals )---------------------------------------------------------- */
/**
* Provides read sockets to the future core.
*
* @return list List of read sockets.
* @task internal
*/
public function getReadSockets() {
list($stdin, $stdout, $stderr) = $this->pipes;
$sockets = array();
if (isset($stdout) && !feof($stdout)) {
$sockets[] = $stdout;
}
if (isset($stderr) && !feof($stderr)) {
$sockets[] = $stderr;
}
return $sockets;
}
/**
* Provides write sockets to the future core.
*
* @return list List of write sockets.
* @task internal
*/
public function getWriteSockets() {
list($stdin, $stdout, $stderr) = $this->pipes;
$sockets = array();
if (isset($stdin) && $this->stdin->getByteLength() && !feof($stdin)) {
$sockets[] = $stdin;
}
return $sockets;
}
/**
* Determine if the read buffer is empty.
*
* @return bool True if the read buffer is empty.
* @task internal
*/
public function isReadBufferEmpty() {
return !strlen($this->stdout);
}
/**
* Determine if the write buffer is empty.
*
* @return bool True if the write buffer is empty.
* @task internal
*/
public function isWriteBufferEmpty() {
return !$this->getWriteBufferSize();
}
/**
* Determine the number of bytes in the write buffer.
*
* @return int Number of bytes in the write buffer.
* @task internal
*/
public function getWriteBufferSize() {
if (!$this->stdin) {
return 0;
}
return $this->stdin->getByteLength();
}
/**
* Reads some bytes from a stream, discarding output once a certain amount
* has been accumulated.
*
* @param resource Stream to read from.
* @param int Maximum number of bytes to return from $stream. If
* additional bytes are available, they will be read and
* discarded.
* @param string Human-readable description of stream, for exception
* message.
* @param int Maximum number of bytes to read.
* @return string The data read from the stream.
* @task internal
*/
private function readAndDiscard($stream, $limit, $description, $length) {
$output = '';
if ($length <= 0) {
return '';
}
do {
$data = fread($stream, min($length, 64 * 1024));
if (false === $data) {
throw new Exception(pht('Failed to read from %s', $description));
}
$read_bytes = strlen($data);
if ($read_bytes > 0 && $limit > 0) {
if ($read_bytes > $limit) {
$data = substr($data, 0, $limit);
}
$output .= $data;
$limit -= strlen($data);
}
if (strlen($output) >= $length) {
break;
}
} while ($read_bytes > 0);
return $output;
}
/**
* Begin or continue command execution.
*
* @return bool True if future has resolved.
* @task internal
*/
public function isReady() {
// NOTE: We have soft dependencies on PhutilServiceProfiler and
// PhutilErrorTrap here. These dependencies are soft to avoid the need to
// build them into the Phage agent. Under normal circumstances, these
// classes are always available.
if (!$this->pipes) {
// NOTE: See note above about Phage.
if (class_exists('PhutilServiceProfiler')) {
$profiler = PhutilServiceProfiler::getInstance();
$this->profilerCallID = $profiler->beginServiceCall(
array(
'type' => 'exec',
'command' => (string)$this->command,
));
}
if (!$this->start) {
// We might already have started the timer via initiating resolution.
$this->start = microtime(true);
}
$unmasked_command = $this->command;
if ($unmasked_command instanceof PhutilCommandString) {
$unmasked_command = $unmasked_command->getUnmaskedString();
}
$pipes = array();
if (phutil_is_windows()) {
// See T4395. proc_open under Windows uses "cmd /C [cmd]", which will
// strip the first and last quote when there aren't exactly two quotes
// (and some other conditions as well). This results in a command that
// looks like `command" "path to my file" "something something` which is
// clearly wrong. By surrounding the command string with quotes we can
// be sure this process is harmless.
if (strpos($unmasked_command, '"') !== false) {
$unmasked_command = '"'.$unmasked_command.'"';
}
}
if ($this->hasEnv()) {
$env = $this->getEnv();
} else {
$env = null;
}
$cwd = $this->getCWD();
// NOTE: See note above about Phage.
if (class_exists('PhutilErrorTrap')) {
$trap = new PhutilErrorTrap();
} else {
$trap = null;
}
$spec = self::$descriptorSpec;
if ($this->useWindowsFileStreams) {
$this->windowsStdoutTempFile = new TempFile();
$this->windowsStderrTempFile = new TempFile();
$spec = array(
0 => self::$descriptorSpec[0], // stdin
1 => fopen($this->windowsStdoutTempFile, 'wb'), // stdout
2 => fopen($this->windowsStderrTempFile, 'wb'), // stderr
);
if (!$spec[1] || !$spec[2]) {
throw new Exception(pht(
'Unable to create temporary files for '.
'Windows stdout / stderr streams'));
}
}
$proc = @proc_open(
$unmasked_command,
$spec,
$pipes,
$cwd,
$env);
if ($this->useWindowsFileStreams) {
fclose($spec[1]);
fclose($spec[2]);
$pipes = array(
0 => head($pipes), // stdin
1 => fopen($this->windowsStdoutTempFile, 'rb'), // stdout
2 => fopen($this->windowsStderrTempFile, 'rb'), // stderr
);
if (!$pipes[1] || !$pipes[2]) {
throw new Exception(pht(
'Unable to open temporary files for '.
'reading Windows stdout / stderr streams'));
}
}
if ($trap) {
$err = $trap->getErrorsAsString();
$trap->destroy();
} else {
$err = error_get_last();
}
if (!is_resource($proc)) {
throw new Exception(
pht(
'Failed to `%s`: %s',
'proc_open()',
$err));
}
$this->pipes = $pipes;
$this->proc = $proc;
list($stdin, $stdout, $stderr) = $pipes;
if (!phutil_is_windows()) {
// On Windows, we redirect process standard output and standard error
// through temporary files, and then use stream_select to determine
// if there's more data to read.
if ((!stream_set_blocking($stdout, false)) ||
(!stream_set_blocking($stderr, false)) ||
(!stream_set_blocking($stdin, false))) {
$this->__destruct();
throw new Exception(pht('Failed to set streams nonblocking.'));
}
}
$this->tryToCloseStdin();
return false;
}
if (!$this->proc) {
return true;
}
list($stdin, $stdout, $stderr) = $this->pipes;
while (isset($this->stdin) && $this->stdin->getByteLength()) {
$write_segment = $this->stdin->getAnyPrefix();
$bytes = fwrite($stdin, $write_segment);
if ($bytes === false) {
throw new Exception(pht('Unable to write to stdin!'));
} else if ($bytes) {
$this->stdin->removeBytesFromHead($bytes);
} else {
// Writes are blocked for now.
break;
}
}
$this->tryToCloseStdin();
// Read status before reading pipes so that we can never miss data that
// arrives between our last read and the process exiting.
$status = $this->procGetStatus();
$read_buffer_size = $this->readBufferSize;
$max_stdout_read_bytes = PHP_INT_MAX;
$max_stderr_read_bytes = PHP_INT_MAX;
if ($read_buffer_size !== null) {
$max_stdout_read_bytes = $read_buffer_size - strlen($this->stdout);
$max_stderr_read_bytes = $read_buffer_size - strlen($this->stderr);
}
if ($max_stdout_read_bytes > 0) {
$this->stdout .= $this->readAndDiscard(
$stdout,
$this->getStdoutSizeLimit() - strlen($this->stdout),
'stdout',
$max_stdout_read_bytes);
}
if ($max_stderr_read_bytes > 0) {
$this->stderr .= $this->readAndDiscard(
$stderr,
$this->getStderrSizeLimit() - strlen($this->stderr),
'stderr',
$max_stderr_read_bytes);
}
$is_done = false;
if (!$status['running']) {
// We may still have unread bytes on stdout or stderr, particularly if
// this future is being buffered and streamed. If we do, we don't want to
// consider the subprocess to have exited until we've read everything.
// See T9724 for context.
if (feof($stdout) && feof($stderr)) {
$is_done = true;
}
}
if ($is_done) {
if ($this->useWindowsFileStreams) {
fclose($stdout);
fclose($stderr);
}
+ // If the subprocess got nuked with `kill -9`, we get a -1 exitcode.
+ // Upgrade this to a slightly more informative value by examining the
+ // terminating signal code.
+ $err = $status['exitcode'];
+ if ($err == -1) {
+ if ($status['signaled']) {
+ $err = 128 + $status['termsig'];
+ }
+ }
+
$this->result = array(
- $status['exitcode'],
+ $err,
$this->stdout,
$this->stderr,
);
$this->closeProcess();
return true;
}
$elapsed = (microtime(true) - $this->start);
if ($this->timeout && ($elapsed >= $this->timeout)) {
$this->killedByTimeout = true;
$this->resolveKill();
return true;
}
}
/**
* @return void
* @task internal
*/
public function __destruct() {
if (!$this->proc) {
return;
}
// NOTE: If we try to proc_close() an open process, we hang indefinitely. To
// avoid this, kill the process explicitly if it's still running.
$status = $this->procGetStatus();
if ($status['running']) {
$this->resolveKill();
} else {
$this->closeProcess();
}
}
/**
* Close and free resources if necessary.
*
* @return void
* @task internal
*/
private function closeProcess() {
foreach ($this->pipes as $pipe) {
if (isset($pipe)) {
@fclose($pipe);
}
}
$this->pipes = array(null, null, null);
if ($this->proc) {
@proc_close($this->proc);
$this->proc = null;
}
$this->stdin = null;
if ($this->profilerCallID !== null) {
$profiler = PhutilServiceProfiler::getInstance();
$profiler->endServiceCall(
$this->profilerCallID,
array(
'err' => $this->result ? idx($this->result, 0) : null,
));
$this->profilerCallID = null;
}
}
/**
* Execute `proc_get_status()`, but avoid pitfalls.
*
* @return dict Process status.
* @task internal
*/
private function procGetStatus() {
// After the process exits, we only get one chance to read proc_get_status()
// before it starts returning garbage. Make sure we don't throw away the
// last good read.
if ($this->procStatus) {
if (!$this->procStatus['running']) {
return $this->procStatus;
}
}
$this->procStatus = proc_get_status($this->proc);
+
return $this->procStatus;
}
/**
* Try to close stdin, if we're done using it. This keeps us from hanging if
* the process on the other end of the pipe is waiting for EOF.
*
* @return void
* @task internal
*/
private function tryToCloseStdin() {
if (!$this->closePipe) {
// We've been told to keep the pipe open by a call to write(..., true).
return;
}
if ($this->stdin->getByteLength()) {
// We still have bytes to write.
return;
}
list($stdin) = $this->pipes;
if (!$stdin) {
// We've already closed stdin.
return;
}
// There's nothing stopping us from closing stdin, so close it.
@fclose($stdin);
$this->pipes[0] = null;
}
public function getDefaultWait() {
$wait = parent::getDefaultWait();
if ($this->timeout) {
if (!$this->start) {
$this->start = microtime(true);
}
$elapsed = (microtime(true) - $this->start);
$wait = max(0, min($this->timeout - $elapsed, $wait));
}
return $wait;
}
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Feb 26, 6:39 PM (14 h, 51 m)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
23/39/0db65473eb8216bcc1eb19a2b6ab
Attached To
rPHUTIL libphutil
Event Timeline
Log In to Comment