Page MenuHome GnuPG

No OneTemporary

diff --git a/scripts/daemon/exec/exec_daemon.php b/scripts/daemon/exec/exec_daemon.php
index ab74112..2cb206e 100755
--- a/scripts/daemon/exec/exec_daemon.php
+++ b/scripts/daemon/exec/exec_daemon.php
@@ -1,125 +1,127 @@
#!/usr/bin/env php
<?php
+declare(ticks = 1);
+
require_once dirname(__FILE__).'/../../__init_script__.php';
if (!posix_isatty(STDOUT)) {
$sid = posix_setsid();
if ($sid <= 0) {
throw new Exception(pht('Failed to create new process session!'));
}
}
$args = new PhutilArgumentParser($argv);
$args->setTagline(pht('daemon executor'));
$args->setSynopsis(<<<EOHELP
**exec_daemon.php** [__options__] __daemon__ ...
Run an instance of __daemon__.
EOHELP
);
$args->parse(
array(
array(
'name' => 'trace',
'help' => pht('Enable debug tracing.'),
),
array(
'name' => 'trace-memory',
'help' => pht('Enable debug memory tracing.'),
),
array(
'name' => 'verbose',
'help' => pht('Enable verbose activity logging.'),
),
array(
'name' => 'label',
'short' => 'l',
'param' => 'label',
'help' => pht(
'Optional process label. Makes "%s" nicer, no behavioral effects.',
'ps'),
),
array(
'name' => 'daemon',
'wildcard' => true,
),
));
$trace_memory = $args->getArg('trace-memory');
$trace_mode = $args->getArg('trace') || $trace_memory;
$verbose = $args->getArg('verbose');
if (function_exists('posix_isatty') && posix_isatty(STDIN)) {
fprintf(STDERR, pht('Reading daemon configuration from stdin...')."\n");
}
$config = @file_get_contents('php://stdin');
$config = id(new PhutilJSONParser())->parse($config);
PhutilTypeSpec::checkMap(
$config,
array(
'log' => 'optional string|null',
'argv' => 'optional list<wild>',
'load' => 'optional list<string>',
'autoscale' => 'optional wild',
));
$log = idx($config, 'log');
if ($log) {
ini_set('error_log', $log);
PhutilErrorHandler::setErrorListener(array('PhutilDaemon', 'errorListener'));
}
$load = idx($config, 'load', array());
foreach ($load as $library) {
$library = Filesystem::resolvePath($library);
phutil_load_library($library);
}
PhutilErrorHandler::initialize();
$daemon = $args->getArg('daemon');
if (!$daemon) {
throw new PhutilArgumentUsageException(
pht('Specify which class of daemon to start.'));
} else if (count($daemon) > 1) {
throw new PhutilArgumentUsageException(
pht('Specify exactly one daemon to start.'));
} else {
$daemon = head($daemon);
if (!class_exists($daemon)) {
throw new PhutilArgumentUsageException(
pht(
'No class "%s" exists in any known library.',
$daemon));
} else if (!is_subclass_of($daemon, 'PhutilDaemon')) {
throw new PhutilArgumentUsageException(
pht(
'Class "%s" is not a subclass of "%s".',
$daemon,
'PhutilDaemon'));
}
}
$argv = idx($config, 'argv', array());
$daemon = newv($daemon, array($argv));
if ($trace_mode) {
$daemon->setTraceMode();
}
if ($trace_memory) {
$daemon->setTraceMemory();
}
if ($verbose) {
$daemon->setVerbose(true);
}
$autoscale = idx($config, 'autoscale');
if ($autoscale) {
$daemon->setAutoscaleProperties($autoscale);
}
$daemon->execute();
diff --git a/src/daemon/PhutilDaemon.php b/src/daemon/PhutilDaemon.php
index 4d7afe6..43a6c3f 100644
--- a/src/daemon/PhutilDaemon.php
+++ b/src/daemon/PhutilDaemon.php
@@ -1,396 +1,410 @@
<?php
/**
* Scaffolding for implementing robust background processing scripts.
*
*
* Autoscaling
* ===========
*
* Autoscaling automatically launches copies of a daemon when it is busy
* (scaling the pool up) and stops them when they're idle (scaling the pool
* down). This is appropriate for daemons which perform highly parallelizable
* work.
*
* To make a daemon support autoscaling, the implementation should look
* something like this:
*
* while (!$this->shouldExit()) {
* if (work_available()) {
* $this->willBeginWork();
* do_work();
* $this->sleep(0);
* } else {
* $this->willBeginIdle();
* $this->sleep(1);
* }
* }
*
* In particular, call @{method:willBeginWork} before becoming busy, and
* @{method:willBeginIdle} when no work is available. If the daemon is launched
* into an autoscale pool, this will cause the pool to automatically scale up
* when busy and down when idle.
*
* See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple
* autoscaling daemon.
*
* Launching a daemon which does not make these callbacks into an autoscale
* pool will have no effect.
*
* @task overseer Communicating With the Overseer
* @task autoscale Autoscaling Daemon Pools
*/
abstract class PhutilDaemon extends Phobject {
const MESSAGETYPE_STDOUT = 'stdout';
const MESSAGETYPE_HEARTBEAT = 'heartbeat';
const MESSAGETYPE_BUSY = 'busy';
const MESSAGETYPE_IDLE = 'idle';
const MESSAGETYPE_DOWN = 'down';
const WORKSTATE_BUSY = 'busy';
const WORKSTATE_IDLE = 'idle';
private $argv;
private $traceMode;
private $traceMemory;
private $verbose;
private $notifyReceived;
private $inGracefulShutdown;
private $workState = null;
private $idleSince = null;
private $autoscaleProperties = array();
final public function setVerbose($verbose) {
$this->verbose = $verbose;
return $this;
}
final public function getVerbose() {
return $this->verbose;
}
private static $sighandlerInstalled;
final public function __construct(array $argv) {
- declare(ticks = 1);
$this->argv = $argv;
if (!self::$sighandlerInstalled) {
self::$sighandlerInstalled = true;
pcntl_signal(SIGTERM, __CLASS__.'::exitOnSignal');
}
- pcntl_signal(SIGINT, array($this, 'onGracefulSignal'));
+ pcntl_signal(SIGINT, array($this, 'onGracefulSignal'));
pcntl_signal(SIGUSR2, array($this, 'onNotifySignal'));
// Without discard mode, this consumes unbounded amounts of memory. Keep
// memory bounded.
PhutilServiceProfiler::getInstance()->enableDiscardMode();
$this->beginStdoutCapture();
}
final public function __destruct() {
$this->endStdoutCapture();
}
final public function stillWorking() {
$this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null);
if ($this->traceMemory) {
$daemon = get_class($this);
fprintf(
STDERR,
- "<%s> %s %s\n",
+ "%s %s %s\n",
'<RAMS>',
$daemon,
pht(
'Memory Usage: %s KB',
new PhutilNumber(memory_get_usage() / 1024, 1)));
}
}
final public function shouldExit() {
return $this->inGracefulShutdown;
}
final protected function sleep($duration) {
$this->notifyReceived = false;
$this->willSleep($duration);
$this->stillWorking();
$is_autoscale = $this->isClonedAutoscaleDaemon();
$scale_down = $this->getAutoscaleDownDuration();
$max_sleep = 60;
if ($is_autoscale) {
$max_sleep = min($max_sleep, $scale_down);
}
if ($is_autoscale) {
if ($this->workState == self::WORKSTATE_IDLE) {
$dur = (time() - $this->idleSince);
$this->log(pht('Idle for %s seconds.', $dur));
}
}
while ($duration > 0 &&
!$this->notifyReceived &&
!$this->shouldExit()) {
// If this is an autoscaling clone and we've been idle for too long,
// we're going to scale the pool down by exiting and not restarting. The
// DOWN message tells the overseer that we don't want to be restarted.
if ($is_autoscale) {
if ($this->workState == self::WORKSTATE_IDLE) {
if ($this->idleSince && ($this->idleSince + $scale_down < time())) {
$this->inGracefulShutdown = true;
$this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null);
$this->log(
pht(
'Daemon was idle for more than %s second(s), '.
'scaling pool down.',
new PhutilNumber($scale_down)));
break;
}
}
}
sleep(min($duration, $max_sleep));
$duration -= $max_sleep;
$this->stillWorking();
}
}
protected function willSleep($duration) {
return;
}
public static function exitOnSignal($signo) {
- // Normally, PHP doesn't invoke destructors when existing in response to
+ self::didCatchSignal($signo);
+
+ // Normally, PHP doesn't invoke destructors when exiting in response to
// a signal. This forces it to do so, so we have a fighting chance of
// releasing any locks, leases or resources on our way out.
exit(128 + $signo);
}
final protected function getArgv() {
return $this->argv;
}
final public function execute() {
$this->willRun();
$this->run();
}
abstract protected function run();
final public function setTraceMemory() {
$this->traceMemory = true;
return $this;
}
final public function getTraceMemory() {
return $this->traceMemory;
}
final public function setTraceMode() {
$this->traceMode = true;
PhutilServiceProfiler::installEchoListener();
PhutilConsole::getConsole()->getServer()->setEnableLog(true);
$this->didSetTraceMode();
return $this;
}
final public function getTraceMode() {
return $this->traceMode;
}
final public function onGracefulSignal($signo) {
+ self::didCatchSignal($signo);
$this->inGracefulShutdown = true;
}
final public function onNotifySignal($signo) {
+ self::didCatchSignal($signo);
$this->notifyReceived = true;
$this->onNotify($signo);
}
protected function onNotify($signo) {
// This is a hook for subclasses.
}
protected function willRun() {
// This is a hook for subclasses.
}
protected function didSetTraceMode() {
// This is a hook for subclasses.
}
final protected function log($message) {
if ($this->verbose) {
$daemon = get_class($this);
- fprintf(STDERR, "<%s> %s %s\n", '<VERB>', $daemon, $message);
+ fprintf(STDERR, "%s %s %s\n", '<VERB>', $daemon, $message);
}
}
+ private static function didCatchSignal($signo) {
+ $signame = phutil_get_signal_name($signo);
+ fprintf(
+ STDERR,
+ "%s Caught signal %s (%s).\n",
+ '<SGNL>',
+ $signo,
+ $signame);
+ }
+
/* -( Communicating With the Overseer )------------------------------------ */
private function beginStdoutCapture() {
ob_start(array($this, 'didReceiveStdout'), 2);
}
private function endStdoutCapture() {
ob_end_flush();
}
public function didReceiveStdout($data) {
if (!strlen($data)) {
return '';
}
+
return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data);
}
private function encodeOverseerMessage($type, $data) {
$structure = array($type);
if ($data !== null) {
$structure[] = $data;
}
return json_encode($structure)."\n";
}
private function emitOverseerMessage($type, $data) {
$this->endStdoutCapture();
echo $this->encodeOverseerMessage($type, $data);
$this->beginStdoutCapture();
}
public static function errorListener($event, $value, array $metadata) {
// If the caller has redirected the error log to a file, PHP won't output
// messages to stderr, so the overseer can't capture them. Install a
// listener which just echoes errors to stderr, so the overseer is always
// aware of errors.
$console = PhutilConsole::getConsole();
$message = idx($metadata, 'default_message');
if ($message) {
$console->writeErr("%s\n", $message);
}
if (idx($metadata, 'trace')) {
$trace = PhutilErrorHandler::formatStacktrace($metadata['trace']);
$console->writeErr("%s\n", $trace);
}
}
/* -( Autoscaling )-------------------------------------------------------- */
/**
* Prepare to become busy. This may autoscale the pool up.
*
* This notifies the overseer that the daemon has become busy. If daemons
* that are part of an autoscale pool are continuously busy for a prolonged
* period of time, the overseer may scale up the pool.
*
* @return this
* @task autoscale
*/
protected function willBeginWork() {
if ($this->workState != self::WORKSTATE_BUSY) {
$this->workState = self::WORKSTATE_BUSY;
$this->idleSince = null;
$this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null);
}
return $this;
}
/**
* Prepare to idle. This may autoscale the pool down.
*
* This notifies the overseer that the daemon is no longer busy. If daemons
* that are part of an autoscale pool are idle for a prolonged period of time,
* they may exit to scale the pool down.
*
* @return this
* @task autoscale
*/
protected function willBeginIdle() {
if ($this->workState != self::WORKSTATE_IDLE) {
$this->workState = self::WORKSTATE_IDLE;
$this->idleSince = time();
$this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null);
}
return $this;
}
/**
* Determine if this is a clone or the original daemon.
*
* @return bool True if this is an cloned autoscaling daemon.
* @task autoscale
*/
private function isClonedAutoscaleDaemon() {
return (bool)$this->getAutoscaleProperty('clone', false);
}
/**
* Get the duration (in seconds) which a daemon must be continuously idle
* for before it should exit to scale the pool down.
*
* @return int Duration, in seconds.
* @task autoscale
*/
private function getAutoscaleDownDuration() {
return $this->getAutoscaleProperty('down', 15);
}
/**
* Configure autoscaling for this daemon.
*
* @param map<string, wild> Map of autoscale properties.
* @return this
* @task autoscale
*/
public function setAutoscaleProperties(array $autoscale_properties) {
PhutilTypeSpec::checkMap(
$autoscale_properties,
array(
'group' => 'optional string',
'up' => 'optional int',
'down' => 'optional int',
'pool' => 'optional int',
'clone' => 'optional bool',
'reserve' => 'optional int|float',
));
$this->autoscaleProperties = $autoscale_properties;
return $this;
}
/**
* Read autoscaling configuration for this daemon.
*
* @param string Property to read.
* @param wild Default value to return if the property is not set.
* @return wild Property value, or `$default` if one is not set.
* @task autoscale
*/
private function getAutoscaleProperty($key, $default = null) {
return idx($this->autoscaleProperties, $key, $default);
}
}
diff --git a/src/daemon/PhutilDaemonHandle.php b/src/daemon/PhutilDaemonHandle.php
index 37858fb..3de873c 100644
--- a/src/daemon/PhutilDaemonHandle.php
+++ b/src/daemon/PhutilDaemonHandle.php
@@ -1,419 +1,422 @@
<?php
final class PhutilDaemonHandle extends Phobject {
const EVENT_DID_LAUNCH = 'daemon.didLaunch';
const EVENT_DID_LOG = 'daemon.didLogMessage';
const EVENT_DID_HEARTBEAT = 'daemon.didHeartbeat';
const EVENT_WILL_GRACEFUL = 'daemon.willGraceful';
const EVENT_WILL_EXIT = 'daemon.willExit';
private $overseer;
private $daemonClass;
private $argv;
private $config;
private $pid;
private $daemonID;
private $deadline;
private $heartbeat;
private $stdoutBuffer;
private $restartAt;
private $silent;
private $shouldRestart = true;
private $shouldShutdown;
private $future;
private $traceMemory;
public function __construct(
PhutilDaemonOverseer $overseer,
$daemon_class,
array $argv,
array $config) {
$this->overseer = $overseer;
$this->daemonClass = $daemon_class;
$this->argv = $argv;
$this->config = $config;
$this->restartAt = time();
$this->daemonID = $this->generateDaemonID();
$this->dispatchEvent(
self::EVENT_DID_LAUNCH,
array(
'argv' => $this->argv,
'explicitArgv' => idx($this->config, 'argv'),
));
}
public function isRunning() {
return (bool)$this->future;
}
public function isDone() {
return (!$this->shouldRestart && !$this->isRunning());
}
public function getFuture() {
return $this->future;
}
public function setSilent($silent) {
$this->silent = $silent;
return $this;
}
public function getSilent() {
return $this->silent;
}
public function setTraceMemory($trace_memory) {
$this->traceMemory = $trace_memory;
return $this;
}
public function getTraceMemory() {
return $this->traceMemory;
}
public function update() {
$this->updateMemory();
if (!$this->isRunning()) {
if (!$this->shouldRestart) {
return;
}
if (!$this->restartAt || (time() < $this->restartAt)) {
return;
}
if ($this->shouldShutdown) {
return;
}
$this->startDaemonProcess();
}
$future = $this->future;
$result = null;
if ($future->isReady()) {
$result = $future->resolve();
}
list($stdout, $stderr) = $future->read();
$future->discardBuffers();
if (strlen($stdout)) {
$this->didReadStdout($stdout);
}
$stderr = trim($stderr);
if (strlen($stderr)) {
- $this->logMessage('STDE', $stderr);
+ foreach (phutil_split_lines($stderr, false) as $line) {
+ $this->logMessage('STDE', $line);
+ }
}
if ($result !== null) {
list($err) = $result;
+
if ($err) {
- $this->logMessage('FAIL', pht('Process exited with error %s', $err));
+ $this->logMessage('FAIL', pht('Process exited with error %s.', $err));
} else {
$this->logMessage('DONE', pht('Process exited normally.'));
}
$this->future = null;
if ($this->shouldShutdown) {
$this->restartAt = null;
} else {
$this->scheduleRestart();
}
}
$this->updateHeartbeatEvent();
$this->updateHangDetection();
}
private function updateHeartbeatEvent() {
if ($this->heartbeat > time()) {
return;
}
$this->heartbeat = time() + $this->getHeartbeatEventFrequency();
$this->dispatchEvent(self::EVENT_DID_HEARTBEAT);
}
private function updateHangDetection() {
if (!$this->isRunning()) {
return;
}
if (time() > $this->deadline) {
$this->logMessage('HANG', pht('Hang detected. Restarting process.'));
$this->annihilateProcessGroup();
$this->scheduleRestart();
}
}
private function scheduleRestart() {
$this->logMessage('WAIT', pht('Waiting to restart process.'));
$this->restartAt = time() + self::getWaitBeforeRestart();
}
/**
* Generate a unique ID for this daemon.
*
* @return string A unique daemon ID.
*/
private function generateDaemonID() {
return substr(getmypid().':'.Filesystem::readRandomCharacters(12), 0, 12);
}
public function getDaemonID() {
return $this->daemonID;
}
public function getPID() {
return $this->pid;
}
private function getCaptureBufferSize() {
return 65535;
}
private function getRequiredHeartbeatFrequency() {
return 86400;
}
public static function getWaitBeforeRestart() {
return 5;
}
public static function getHeartbeatEventFrequency() {
return 120;
}
private function getKillDelay() {
return 3;
}
private function getDaemonCWD() {
$root = dirname(phutil_get_library_root('phutil'));
return $root.'/scripts/daemon/exec/';
}
private function newExecFuture() {
$class = $this->daemonClass;
$argv = $this->argv;
$buffer_size = $this->getCaptureBufferSize();
// NOTE: PHP implements proc_open() by running 'sh -c'. On most systems this
// is bash, but on Ubuntu it's dash. When you proc_open() using bash, you
// get one new process (the command you ran). When you proc_open() using
// dash, you get two new processes: the command you ran and a parent
// "dash -c" (or "sh -c") process. This means that the child process's PID
// is actually the 'dash' PID, not the command's PID. To avoid this, use
// 'exec' to replace the shell process with the real process; without this,
// the child will call posix_getppid(), be given the pid of the 'sh -c'
// process, and send it SIGUSR1 to keepalive which will terminate it
// immediately. We also won't be able to do process group management because
// the shell process won't properly posix_setsid() so the pgid of the child
// won't be meaningful.
return id(new ExecFuture('exec ./exec_daemon.php %s %Ls', $class, $argv))
->setCWD($this->getDaemonCWD())
->setStdoutSizeLimit($buffer_size)
->setStderrSizeLimit($buffer_size)
->write(json_encode($this->config));
}
/**
* Dispatch an event to event listeners.
*
* @param string Event type.
* @param dict Event parameters.
* @return void
*/
private function dispatchEvent($type, array $params = array()) {
$data = array(
'id' => $this->daemonID,
'daemonClass' => $this->daemonClass,
'childPID' => $this->pid,
) + $params;
$event = new PhutilEvent($type, $data);
try {
PhutilEventEngine::dispatchEvent($event);
} catch (Exception $ex) {
phlog($ex);
}
}
private function annihilateProcessGroup() {
$pid = $this->pid;
$pgid = posix_getpgid($pid);
if ($pid && $pgid) {
posix_kill(-$pgid, SIGTERM);
sleep($this->getKillDelay());
posix_kill(-$pgid, SIGKILL);
$this->pid = null;
}
}
private function updateMemory() {
if ($this->traceMemory) {
$this->logMessage(
'RAMS',
pht(
'Overseer Memory Usage: %s KB',
new PhutilNumber(memory_get_usage() / 1024, 1)));
}
}
private function startDaemonProcess() {
$this->logMessage('INIT', pht('Starting process.'));
$this->deadline = time() + $this->getRequiredHeartbeatFrequency();
$this->heartbeat = time() + self::getHeartbeatEventFrequency();
$this->stdoutBuffer = '';
$this->future = $this->newExecFuture();
$this->future->start();
$this->pid = $this->future->getPID();
}
private function didReadStdout($data) {
$this->stdoutBuffer .= $data;
while (true) {
$pos = strpos($this->stdoutBuffer, "\n");
if ($pos === false) {
break;
}
$message = substr($this->stdoutBuffer, 0, $pos);
$this->stdoutBuffer = substr($this->stdoutBuffer, $pos + 1);
try {
$structure = phutil_json_decode($message);
} catch (PhutilJSONParserException $ex) {
$structure = array();
}
switch (idx($structure, 0)) {
case PhutilDaemon::MESSAGETYPE_STDOUT:
$this->logMessage('STDO', idx($structure, 1));
break;
case PhutilDaemon::MESSAGETYPE_HEARTBEAT:
$this->deadline = time() + $this->getRequiredHeartbeatFrequency();
break;
case PhutilDaemon::MESSAGETYPE_BUSY:
$this->overseer->didBeginWork($this);
break;
case PhutilDaemon::MESSAGETYPE_IDLE:
$this->overseer->didBeginIdle($this);
break;
case PhutilDaemon::MESSAGETYPE_DOWN:
// The daemon is exiting because it doesn't have enough work and it
// is trying to scale the pool down. We should not restart it.
$this->shouldRestart = false;
$this->shouldShutdown = true;
break;
default:
// If we can't parse this or it isn't a message we understand, just
// emit the raw message.
$this->logMessage('STDO', pht('<Malformed> %s', $message));
break;
}
}
}
public function didReceiveNotifySignal($signo) {
$pid = $this->pid;
if ($pid) {
posix_kill($pid, $signo);
}
}
public function didReceiveReloadSignal($signo) {
$signame = phutil_get_signal_name($signo);
if ($signame) {
$sigmsg = pht(
'Reloading in response to signal %d (%s).',
$signo,
$signame);
} else {
$sigmsg = pht(
'Reloading in response to signal %d.',
$signo);
}
$this->logMessage('RELO', $sigmsg, $signo);
// This signal means "stop the current process gracefully, then launch
// a new identical process once it exits". This can be used to update
// daemons after code changes (the new processes will run the new code)
// without aborting any running tasks.
// We SIGINT the daemon but don't set the shutdown flag, so it will
// naturally be restarted after it exits, as though it had exited after an
// unhandled exception.
posix_kill($this->pid, SIGINT);
}
public function didReceiveGracefulSignal($signo) {
$this->shouldShutdown = true;
$this->shouldRestart = false;
$signame = phutil_get_signal_name($signo);
if ($signame) {
$sigmsg = pht(
'Graceful shutdown in response to signal %d (%s).',
$signo,
$signame);
} else {
$sigmsg = pht(
'Graceful shutdown in response to signal %d.',
$signo);
}
$this->logMessage('DONE', $sigmsg, $signo);
posix_kill($this->pid, SIGINT);
}
public function didReceiveTerminalSignal($signo) {
$this->shouldShutdown = true;
$this->shouldRestart = false;
$signame = phutil_get_signal_name($signo);
if ($signame) {
$sigmsg = pht(
'Shutting down in response to signal %s (%s).',
$signo,
$signame);
} else {
$sigmsg = pht('Shutting down in response to signal %s.', $signo);
}
$this->logMessage('EXIT', $sigmsg, $signo);
$this->annihilateProcessGroup();
}
private function logMessage($type, $message, $context = null) {
if (!$this->getSilent()) {
echo date('Y-m-d g:i:s A').' ['.$type.'] '.$message."\n";
}
$this->dispatchEvent(
self::EVENT_DID_LOG,
array(
'type' => $type,
'message' => $message,
'context' => $context,
));
}
public function didRemoveDaemon() {
$this->dispatchEvent(self::EVENT_WILL_EXIT);
}
}
diff --git a/src/future/exec/ExecFuture.php b/src/future/exec/ExecFuture.php
index 23bdb21..11ecca1 100644
--- a/src/future/exec/ExecFuture.php
+++ b/src/future/exec/ExecFuture.php
@@ -1,904 +1,915 @@
<?php
/**
* Execute system commands in parallel using futures.
*
* ExecFuture is a future, which means it runs asynchronously and represents
* a value which may not exist yet. See @{article:Using Futures} for an
* explanation of futures. When an ExecFuture resolves, it returns the exit
* code, stdout and stderr of the process it executed.
*
* ExecFuture is the core command execution implementation in libphutil, but is
* exposed through a number of APIs. See @{article:Command Execution} for more
* discussion about executing system commands.
*
* @task create Creating ExecFutures
* @task resolve Resolving Execution
* @task config Configuring Execution
* @task info Command Information
* @task interact Interacting With Commands
* @task internal Internals
*/
final class ExecFuture extends PhutilExecutableFuture {
private $pipes = array();
private $proc = null;
private $start = null;
private $timeout = null;
private $procStatus = null;
private $stdout = null;
private $stderr = null;
private $stdin = null;
private $closePipe = true;
private $stdoutPos = 0;
private $stderrPos = 0;
private $command = null;
private $readBufferSize;
private $stdoutSizeLimit = PHP_INT_MAX;
private $stderrSizeLimit = PHP_INT_MAX;
private $profilerCallID;
private $killedByTimeout;
private $useWindowsFileStreams = false;
private $windowsStdoutTempFile = null;
private $windowsStderrTempFile = null;
private static $descriptorSpec = array(
0 => array('pipe', 'r'), // stdin
1 => array('pipe', 'w'), // stdout
2 => array('pipe', 'w'), // stderr
);
/* -( Creating ExecFutures )----------------------------------------------- */
/**
* Create a new ExecFuture.
*
* $future = new ExecFuture('wc -l %s', $file_path);
*
* @param string `sprintf()`-style command string which will be passed
* through @{function:csprintf} with the rest of the arguments.
* @param ... Zero or more additional arguments for @{function:csprintf}.
* @return ExecFuture ExecFuture for running the specified command.
* @task create
*/
public function __construct($command) {
$argv = func_get_args();
$this->command = call_user_func_array('csprintf', $argv);
$this->stdin = new PhutilRope();
}
/* -( Command Information )------------------------------------------------ */
/**
* Retrieve the raw command to be executed.
*
* @return string Raw command.
* @task info
*/
public function getCommand() {
return $this->command;
}
/**
* Retrieve the byte limit for the stderr buffer.
*
* @return int Maximum buffer size, in bytes.
* @task info
*/
public function getStderrSizeLimit() {
return $this->stderrSizeLimit;
}
/**
* Retrieve the byte limit for the stdout buffer.
*
* @return int Maximum buffer size, in bytes.
* @task info
*/
public function getStdoutSizeLimit() {
return $this->stdoutSizeLimit;
}
/**
* Get the process's pid. This only works after execution is initiated, e.g.
* by a call to start().
*
* @return int Process ID of the executing process.
* @task info
*/
public function getPID() {
$status = $this->procGetStatus();
return $status['pid'];
}
/* -( Configuring Execution )---------------------------------------------- */
/**
* Set a maximum size for the stdout read buffer. To limit stderr, see
* @{method:setStderrSizeLimit}. The major use of these methods is to use less
* memory if you are running a command which sometimes produces huge volumes
* of output that you don't really care about.
*
* NOTE: Setting this to 0 means "no buffer", not "unlimited buffer".
*
* @param int Maximum size of the stdout read buffer.
* @return this
* @task config
*/
public function setStdoutSizeLimit($limit) {
$this->stdoutSizeLimit = $limit;
return $this;
}
/**
* Set a maximum size for the stderr read buffer.
* See @{method:setStdoutSizeLimit} for discussion.
*
* @param int Maximum size of the stderr read buffer.
* @return this
* @task config
*/
public function setStderrSizeLimit($limit) {
$this->stderrSizeLimit = $limit;
return $this;
}
/**
* Set the maximum internal read buffer size this future. The future will
* block reads once the internal stdout or stderr buffer exceeds this size.
*
* NOTE: If you @{method:resolve} a future with a read buffer limit, you may
* block forever!
*
* TODO: We should probably release the read buffer limit during
* @{method:resolve}, or otherwise detect this. For now, be careful.
*
* @param int|null Maximum buffer size, or `null` for unlimited.
* @return this
*/
public function setReadBufferSize($read_buffer_size) {
$this->readBufferSize = $read_buffer_size;
return $this;
}
/**
* Set whether to use non-blocking streams on Windows.
*
* @param bool Whether to use non-blocking streams.
* @return this
* @task config
*/
public function setUseWindowsFileStreams($use_streams) {
if (phutil_is_windows()) {
$this->useWindowsFileStreams = $use_streams;
}
return $this;
}
/* -( Interacting With Commands )------------------------------------------ */
/**
* Read and return output from stdout and stderr, if any is available. This
* method keeps a read cursor on each stream, but the entire streams are
* still returned when the future resolves. You can call read() again after
* resolving the future to retrieve only the parts of the streams you did not
* previously read:
*
* $future = new ExecFuture('...');
* // ...
* list($stdout) = $future->read(); // Returns output so far
* list($stdout) = $future->read(); // Returns new output since first call
* // ...
* list($stdout) = $future->resolvex(); // Returns ALL output
* list($stdout) = $future->read(); // Returns unread output
*
* NOTE: If you set a limit with @{method:setStdoutSizeLimit} or
* @{method:setStderrSizeLimit}, this method will not be able to read data
* past the limit.
*
* NOTE: If you call @{method:discardBuffers}, all the stdout/stderr data
* will be thrown away and the cursors will be reset.
*
* @return pair <$stdout, $stderr> pair with new output since the last call
* to this method.
* @task interact
*/
public function read() {
$stdout = $this->readStdout();
$result = array(
$stdout,
(string)substr($this->stderr, $this->stderrPos),
);
$this->stderrPos = strlen($this->stderr);
return $result;
}
public function readStdout() {
if ($this->start) {
$this->isReady(); // Sync
}
$result = (string)substr($this->stdout, $this->stdoutPos);
$this->stdoutPos = strlen($this->stdout);
return $result;
}
/**
* Write data to stdin of the command.
*
* @param string Data to write.
* @param bool If true, keep the pipe open for writing. By default, the pipe
* will be closed as soon as possible so that commands which
* listen for EOF will execute. If you want to keep the pipe open
* past the start of command execution, do an empty write with
* `$keep_pipe = true` first.
* @return this
* @task interact
*/
public function write($data, $keep_pipe = false) {
if (strlen($data)) {
if (!$this->stdin) {
throw new Exception(pht('Writing to a closed pipe!'));
}
$this->stdin->append($data);
}
$this->closePipe = !$keep_pipe;
return $this;
}
/**
* Permanently discard the stdout and stderr buffers and reset the read
* cursors. This is basically useful only if you are streaming a large amount
* of data from some process:
*
* $future = new ExecFuture('zcat huge_file.gz');
* do {
* $done = $future->resolve(0.1); // Every 100ms,
* list($stdout) = $future->read(); // read output...
* echo $stdout; // send it somewhere...
* $future->discardBuffers(); // and then free the buffers.
* } while ($done === null);
*
* Conceivably you might also need to do this if you're writing a client using
* @{class:ExecFuture} and `netcat`, but you probably should not do that.
*
* NOTE: This completely discards the data. It won't be available when the
* future resolves. This is almost certainly only useful if you need the
* buffer memory for some reason.
*
* @return this
* @task interact
*/
public function discardBuffers() {
$this->discardStdoutBuffer();
$this->stderr = '';
$this->stderrPos = 0;
return $this;
}
public function discardStdoutBuffer() {
$this->stdout = '';
$this->stdoutPos = 0;
return $this;
}
/**
* Returns true if this future was killed by a timeout configured with
* @{method:setTimeout}.
*
* @return bool True if the future was killed for exceeding its time limit.
*/
public function getWasKilledByTimeout() {
return $this->killedByTimeout;
}
/* -( Configuring Execution )---------------------------------------------- */
/**
* Set a hard limit on execution time. If the command runs longer, it will
* be killed and the future will resolve with an error code. You can test
* if a future was killed by a timeout with @{method:getWasKilledByTimeout}.
*
* @param int Maximum number of seconds this command may execute for.
* @return this
* @task config
*/
public function setTimeout($seconds) {
$this->timeout = $seconds;
return $this;
}
/* -( Resolving Execution )------------------------------------------------ */
/**
* Resolve a command you expect to exit with return code 0. Works like
* @{method:resolve}, but throws if $err is nonempty. Returns only
* $stdout and $stderr. See also @{function:execx}.
*
* list($stdout, $stderr) = $future->resolvex();
*
* @param float Optional timeout after which resolution will pause and
* execution will return to the caller.
* @return pair <$stdout, $stderr> pair.
* @task resolve
*/
public function resolvex($timeout = null) {
list($err, $stdout, $stderr) = $this->resolve($timeout);
if ($err) {
$cmd = $this->command;
throw new CommandException(
pht('Command failed with error #%d!', $err),
$cmd,
$err,
$stdout,
$stderr);
}
return array($stdout, $stderr);
}
/**
* Resolve a command you expect to return valid JSON. Works like
* @{method:resolvex}, but also throws if stderr is nonempty, or stdout is not
* valid JSON. Returns a PHP array, decoded from the JSON command output.
*
* @param float Optional timeout after which resolution will pause and
* execution will return to the caller.
* @return array PHP array, decoded from JSON command output.
* @task resolve
*/
public function resolveJSON($timeout = null) {
list($stdout, $stderr) = $this->resolvex($timeout);
if (strlen($stderr)) {
$cmd = $this->command;
throw new CommandException(
pht(
"JSON command '%s' emitted text to stderr when none was expected: %d",
$cmd,
$stderr),
$cmd,
0,
$stdout,
$stderr);
}
try {
return phutil_json_decode($stdout);
} catch (PhutilJSONParserException $ex) {
$cmd = $this->command;
throw new CommandException(
pht(
"JSON command '%s' did not produce a valid JSON object on stdout: %s",
$cmd,
$stdout),
$cmd,
0,
$stdout,
$stderr);
}
}
/**
* Resolve the process by abruptly terminating it.
*
* @return list List of <err, stdout, stderr> results.
* @task resolve
*/
public function resolveKill() {
if (!$this->result) {
if (defined('SIGKILL')) {
$signal = SIGKILL;
} else {
$signal = 9;
}
proc_terminate($this->proc, $signal);
$this->result = array(
128 + $signal,
$this->stdout,
$this->stderr,
);
$this->closeProcess();
}
return $this->result;
}
/* -( Internals )---------------------------------------------------------- */
/**
* Provides read sockets to the future core.
*
* @return list List of read sockets.
* @task internal
*/
public function getReadSockets() {
list($stdin, $stdout, $stderr) = $this->pipes;
$sockets = array();
if (isset($stdout) && !feof($stdout)) {
$sockets[] = $stdout;
}
if (isset($stderr) && !feof($stderr)) {
$sockets[] = $stderr;
}
return $sockets;
}
/**
* Provides write sockets to the future core.
*
* @return list List of write sockets.
* @task internal
*/
public function getWriteSockets() {
list($stdin, $stdout, $stderr) = $this->pipes;
$sockets = array();
if (isset($stdin) && $this->stdin->getByteLength() && !feof($stdin)) {
$sockets[] = $stdin;
}
return $sockets;
}
/**
* Determine if the read buffer is empty.
*
* @return bool True if the read buffer is empty.
* @task internal
*/
public function isReadBufferEmpty() {
return !strlen($this->stdout);
}
/**
* Determine if the write buffer is empty.
*
* @return bool True if the write buffer is empty.
* @task internal
*/
public function isWriteBufferEmpty() {
return !$this->getWriteBufferSize();
}
/**
* Determine the number of bytes in the write buffer.
*
* @return int Number of bytes in the write buffer.
* @task internal
*/
public function getWriteBufferSize() {
if (!$this->stdin) {
return 0;
}
return $this->stdin->getByteLength();
}
/**
* Reads some bytes from a stream, discarding output once a certain amount
* has been accumulated.
*
* @param resource Stream to read from.
* @param int Maximum number of bytes to return from $stream. If
* additional bytes are available, they will be read and
* discarded.
* @param string Human-readable description of stream, for exception
* message.
* @param int Maximum number of bytes to read.
* @return string The data read from the stream.
* @task internal
*/
private function readAndDiscard($stream, $limit, $description, $length) {
$output = '';
if ($length <= 0) {
return '';
}
do {
$data = fread($stream, min($length, 64 * 1024));
if (false === $data) {
throw new Exception(pht('Failed to read from %s', $description));
}
$read_bytes = strlen($data);
if ($read_bytes > 0 && $limit > 0) {
if ($read_bytes > $limit) {
$data = substr($data, 0, $limit);
}
$output .= $data;
$limit -= strlen($data);
}
if (strlen($output) >= $length) {
break;
}
} while ($read_bytes > 0);
return $output;
}
/**
* Begin or continue command execution.
*
* @return bool True if future has resolved.
* @task internal
*/
public function isReady() {
// NOTE: We have soft dependencies on PhutilServiceProfiler and
// PhutilErrorTrap here. These dependencies are soft to avoid the need to
// build them into the Phage agent. Under normal circumstances, these
// classes are always available.
if (!$this->pipes) {
// NOTE: See note above about Phage.
if (class_exists('PhutilServiceProfiler')) {
$profiler = PhutilServiceProfiler::getInstance();
$this->profilerCallID = $profiler->beginServiceCall(
array(
'type' => 'exec',
'command' => (string)$this->command,
));
}
if (!$this->start) {
// We might already have started the timer via initiating resolution.
$this->start = microtime(true);
}
$unmasked_command = $this->command;
if ($unmasked_command instanceof PhutilCommandString) {
$unmasked_command = $unmasked_command->getUnmaskedString();
}
$pipes = array();
if (phutil_is_windows()) {
// See T4395. proc_open under Windows uses "cmd /C [cmd]", which will
// strip the first and last quote when there aren't exactly two quotes
// (and some other conditions as well). This results in a command that
// looks like `command" "path to my file" "something something` which is
// clearly wrong. By surrounding the command string with quotes we can
// be sure this process is harmless.
if (strpos($unmasked_command, '"') !== false) {
$unmasked_command = '"'.$unmasked_command.'"';
}
}
if ($this->hasEnv()) {
$env = $this->getEnv();
} else {
$env = null;
}
$cwd = $this->getCWD();
// NOTE: See note above about Phage.
if (class_exists('PhutilErrorTrap')) {
$trap = new PhutilErrorTrap();
} else {
$trap = null;
}
$spec = self::$descriptorSpec;
if ($this->useWindowsFileStreams) {
$this->windowsStdoutTempFile = new TempFile();
$this->windowsStderrTempFile = new TempFile();
$spec = array(
0 => self::$descriptorSpec[0], // stdin
1 => fopen($this->windowsStdoutTempFile, 'wb'), // stdout
2 => fopen($this->windowsStderrTempFile, 'wb'), // stderr
);
if (!$spec[1] || !$spec[2]) {
throw new Exception(pht(
'Unable to create temporary files for '.
'Windows stdout / stderr streams'));
}
}
$proc = @proc_open(
$unmasked_command,
$spec,
$pipes,
$cwd,
$env);
if ($this->useWindowsFileStreams) {
fclose($spec[1]);
fclose($spec[2]);
$pipes = array(
0 => head($pipes), // stdin
1 => fopen($this->windowsStdoutTempFile, 'rb'), // stdout
2 => fopen($this->windowsStderrTempFile, 'rb'), // stderr
);
if (!$pipes[1] || !$pipes[2]) {
throw new Exception(pht(
'Unable to open temporary files for '.
'reading Windows stdout / stderr streams'));
}
}
if ($trap) {
$err = $trap->getErrorsAsString();
$trap->destroy();
} else {
$err = error_get_last();
}
if (!is_resource($proc)) {
throw new Exception(
pht(
'Failed to `%s`: %s',
'proc_open()',
$err));
}
$this->pipes = $pipes;
$this->proc = $proc;
list($stdin, $stdout, $stderr) = $pipes;
if (!phutil_is_windows()) {
// On Windows, we redirect process standard output and standard error
// through temporary files, and then use stream_select to determine
// if there's more data to read.
if ((!stream_set_blocking($stdout, false)) ||
(!stream_set_blocking($stderr, false)) ||
(!stream_set_blocking($stdin, false))) {
$this->__destruct();
throw new Exception(pht('Failed to set streams nonblocking.'));
}
}
$this->tryToCloseStdin();
return false;
}
if (!$this->proc) {
return true;
}
list($stdin, $stdout, $stderr) = $this->pipes;
while (isset($this->stdin) && $this->stdin->getByteLength()) {
$write_segment = $this->stdin->getAnyPrefix();
$bytes = fwrite($stdin, $write_segment);
if ($bytes === false) {
throw new Exception(pht('Unable to write to stdin!'));
} else if ($bytes) {
$this->stdin->removeBytesFromHead($bytes);
} else {
// Writes are blocked for now.
break;
}
}
$this->tryToCloseStdin();
// Read status before reading pipes so that we can never miss data that
// arrives between our last read and the process exiting.
$status = $this->procGetStatus();
$read_buffer_size = $this->readBufferSize;
$max_stdout_read_bytes = PHP_INT_MAX;
$max_stderr_read_bytes = PHP_INT_MAX;
if ($read_buffer_size !== null) {
$max_stdout_read_bytes = $read_buffer_size - strlen($this->stdout);
$max_stderr_read_bytes = $read_buffer_size - strlen($this->stderr);
}
if ($max_stdout_read_bytes > 0) {
$this->stdout .= $this->readAndDiscard(
$stdout,
$this->getStdoutSizeLimit() - strlen($this->stdout),
'stdout',
$max_stdout_read_bytes);
}
if ($max_stderr_read_bytes > 0) {
$this->stderr .= $this->readAndDiscard(
$stderr,
$this->getStderrSizeLimit() - strlen($this->stderr),
'stderr',
$max_stderr_read_bytes);
}
$is_done = false;
if (!$status['running']) {
// We may still have unread bytes on stdout or stderr, particularly if
// this future is being buffered and streamed. If we do, we don't want to
// consider the subprocess to have exited until we've read everything.
// See T9724 for context.
if (feof($stdout) && feof($stderr)) {
$is_done = true;
}
}
if ($is_done) {
if ($this->useWindowsFileStreams) {
fclose($stdout);
fclose($stderr);
}
+ // If the subprocess got nuked with `kill -9`, we get a -1 exitcode.
+ // Upgrade this to a slightly more informative value by examining the
+ // terminating signal code.
+ $err = $status['exitcode'];
+ if ($err == -1) {
+ if ($status['signaled']) {
+ $err = 128 + $status['termsig'];
+ }
+ }
+
$this->result = array(
- $status['exitcode'],
+ $err,
$this->stdout,
$this->stderr,
);
$this->closeProcess();
return true;
}
$elapsed = (microtime(true) - $this->start);
if ($this->timeout && ($elapsed >= $this->timeout)) {
$this->killedByTimeout = true;
$this->resolveKill();
return true;
}
}
/**
* @return void
* @task internal
*/
public function __destruct() {
if (!$this->proc) {
return;
}
// NOTE: If we try to proc_close() an open process, we hang indefinitely. To
// avoid this, kill the process explicitly if it's still running.
$status = $this->procGetStatus();
if ($status['running']) {
$this->resolveKill();
} else {
$this->closeProcess();
}
}
/**
* Close and free resources if necessary.
*
* @return void
* @task internal
*/
private function closeProcess() {
foreach ($this->pipes as $pipe) {
if (isset($pipe)) {
@fclose($pipe);
}
}
$this->pipes = array(null, null, null);
if ($this->proc) {
@proc_close($this->proc);
$this->proc = null;
}
$this->stdin = null;
if ($this->profilerCallID !== null) {
$profiler = PhutilServiceProfiler::getInstance();
$profiler->endServiceCall(
$this->profilerCallID,
array(
'err' => $this->result ? idx($this->result, 0) : null,
));
$this->profilerCallID = null;
}
}
/**
* Execute `proc_get_status()`, but avoid pitfalls.
*
* @return dict Process status.
* @task internal
*/
private function procGetStatus() {
// After the process exits, we only get one chance to read proc_get_status()
// before it starts returning garbage. Make sure we don't throw away the
// last good read.
if ($this->procStatus) {
if (!$this->procStatus['running']) {
return $this->procStatus;
}
}
$this->procStatus = proc_get_status($this->proc);
+
return $this->procStatus;
}
/**
* Try to close stdin, if we're done using it. This keeps us from hanging if
* the process on the other end of the pipe is waiting for EOF.
*
* @return void
* @task internal
*/
private function tryToCloseStdin() {
if (!$this->closePipe) {
// We've been told to keep the pipe open by a call to write(..., true).
return;
}
if ($this->stdin->getByteLength()) {
// We still have bytes to write.
return;
}
list($stdin) = $this->pipes;
if (!$stdin) {
// We've already closed stdin.
return;
}
// There's nothing stopping us from closing stdin, so close it.
@fclose($stdin);
$this->pipes[0] = null;
}
public function getDefaultWait() {
$wait = parent::getDefaultWait();
if ($this->timeout) {
if (!$this->start) {
$this->start = microtime(true);
}
$elapsed = (microtime(true) - $this->start);
$wait = max(0, min($this->timeout - $elapsed, $wait));
}
return $wait;
}
}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Feb 26, 6:39 PM (14 h, 51 m)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
23/39/0db65473eb8216bcc1eb19a2b6ab

Event Timeline