mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-25 21:44:00 -07:00 
			
		
		
		
	
		
			
				
	
	
		
			250 lines
		
	
	
	
		
			7.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			250 lines
		
	
	
	
		
			7.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| # (c) 2014, Brian Coca, Josh Drake, et al
 | |
| # (c) 2017 Ansible Project
 | |
| # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
 | |
| 
 | |
| from __future__ import (absolute_import, division, print_function)
 | |
| __metaclass__ = type
 | |
| 
 | |
| DOCUMENTATION = '''
 | |
|     author: Unknown (!UNKNOWN)
 | |
|     name: memcached
 | |
|     short_description: Use memcached DB for cache
 | |
|     description:
 | |
|         - This cache uses JSON formatted, per host records saved in memcached.
 | |
|     requirements:
 | |
|       - memcache (python lib)
 | |
|     options:
 | |
|       _uri:
 | |
|         description:
 | |
|           - List of connection information for the memcached DBs
 | |
|         default: ['127.0.0.1:11211']
 | |
|         type: list
 | |
|         env:
 | |
|           - name: ANSIBLE_CACHE_PLUGIN_CONNECTION
 | |
|         ini:
 | |
|           - key: fact_caching_connection
 | |
|             section: defaults
 | |
|       _prefix:
 | |
|         description: User defined prefix to use when creating the DB entries
 | |
|         default: ansible_facts
 | |
|         env:
 | |
|           - name: ANSIBLE_CACHE_PLUGIN_PREFIX
 | |
|         ini:
 | |
|           - key: fact_caching_prefix
 | |
|             section: defaults
 | |
|       _timeout:
 | |
|         default: 86400
 | |
|         description: Expiration timeout in seconds for the cache plugin data. Set to 0 to never expire
 | |
|         env:
 | |
|           - name: ANSIBLE_CACHE_PLUGIN_TIMEOUT
 | |
|         ini:
 | |
|           - key: fact_caching_timeout
 | |
|             section: defaults
 | |
|         type: integer
 | |
| '''
 | |
| 
 | |
| import collections
 | |
| import os
 | |
| import time
 | |
| from multiprocessing import Lock
 | |
| from itertools import chain
 | |
| 
 | |
| from ansible import constants as C
 | |
| from ansible.errors import AnsibleError
 | |
| from ansible.module_utils.common._collections_compat import MutableSet
 | |
| from ansible.plugins.cache import BaseCacheModule
 | |
| from ansible.release import __version__ as ansible_base_version
 | |
| from ansible.utils.display import Display
 | |
| 
 | |
| try:
 | |
|     import memcache
 | |
|     HAS_MEMCACHE = True
 | |
| except ImportError:
 | |
|     HAS_MEMCACHE = False
 | |
| 
 | |
| display = Display()
 | |
| 
 | |
| 
 | |
| class ProxyClientPool(object):
 | |
|     """
 | |
|     Memcached connection pooling for thread/fork safety. Inspired by py-redis
 | |
|     connection pool.
 | |
| 
 | |
|     Available connections are maintained in a deque and released in a FIFO manner.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         self.max_connections = kwargs.pop('max_connections', 1024)
 | |
|         self.connection_args = args
 | |
|         self.connection_kwargs = kwargs
 | |
|         self.reset()
 | |
| 
 | |
|     def reset(self):
 | |
|         self.pid = os.getpid()
 | |
|         self._num_connections = 0
 | |
|         self._available_connections = collections.deque(maxlen=self.max_connections)
 | |
|         self._locked_connections = set()
 | |
|         self._lock = Lock()
 | |
| 
 | |
|     def _check_safe(self):
 | |
|         if self.pid != os.getpid():
 | |
|             with self._lock:
 | |
|                 if self.pid == os.getpid():
 | |
|                     # bail out - another thread already acquired the lock
 | |
|                     return
 | |
|                 self.disconnect_all()
 | |
|                 self.reset()
 | |
| 
 | |
|     def get_connection(self):
 | |
|         self._check_safe()
 | |
|         try:
 | |
|             connection = self._available_connections.popleft()
 | |
|         except IndexError:
 | |
|             connection = self.create_connection()
 | |
|         self._locked_connections.add(connection)
 | |
|         return connection
 | |
| 
 | |
|     def create_connection(self):
 | |
|         if self._num_connections >= self.max_connections:
 | |
|             raise RuntimeError("Too many memcached connections")
 | |
|         self._num_connections += 1
 | |
|         return memcache.Client(*self.connection_args, **self.connection_kwargs)
 | |
| 
 | |
|     def release_connection(self, connection):
 | |
|         self._check_safe()
 | |
|         self._locked_connections.remove(connection)
 | |
|         self._available_connections.append(connection)
 | |
| 
 | |
|     def disconnect_all(self):
 | |
|         for conn in chain(self._available_connections, self._locked_connections):
 | |
|             conn.disconnect_all()
 | |
| 
 | |
|     def __getattr__(self, name):
 | |
|         def wrapped(*args, **kwargs):
 | |
|             return self._proxy_client(name, *args, **kwargs)
 | |
|         return wrapped
 | |
| 
 | |
|     def _proxy_client(self, name, *args, **kwargs):
 | |
|         conn = self.get_connection()
 | |
| 
 | |
|         try:
 | |
|             return getattr(conn, name)(*args, **kwargs)
 | |
|         finally:
 | |
|             self.release_connection(conn)
 | |
| 
 | |
| 
 | |
| class CacheModuleKeys(MutableSet):
 | |
|     """
 | |
|     A set subclass that keeps track of insertion time and persists
 | |
|     the set in memcached.
 | |
|     """
 | |
|     PREFIX = 'ansible_cache_keys'
 | |
| 
 | |
|     def __init__(self, cache, *args, **kwargs):
 | |
|         self._cache = cache
 | |
|         self._keyset = dict(*args, **kwargs)
 | |
| 
 | |
|     def __contains__(self, key):
 | |
|         return key in self._keyset
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return iter(self._keyset)
 | |
| 
 | |
|     def __len__(self):
 | |
|         return len(self._keyset)
 | |
| 
 | |
|     def add(self, value):
 | |
|         self._keyset[value] = time.time()
 | |
|         self._cache.set(self.PREFIX, self._keyset)
 | |
| 
 | |
|     def discard(self, value):
 | |
|         del self._keyset[value]
 | |
|         self._cache.set(self.PREFIX, self._keyset)
 | |
| 
 | |
|     def remove_by_timerange(self, s_min, s_max):
 | |
|         for k in list(self._keyset.keys()):
 | |
|             t = self._keyset[k]
 | |
|             if s_min < t < s_max:
 | |
|                 del self._keyset[k]
 | |
|         self._cache.set(self.PREFIX, self._keyset)
 | |
| 
 | |
| 
 | |
| class CacheModule(BaseCacheModule):
 | |
| 
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         connection = ['127.0.0.1:11211']
 | |
| 
 | |
|         try:
 | |
|             super(CacheModule, self).__init__(*args, **kwargs)
 | |
|             if self.get_option('_uri'):
 | |
|                 connection = self.get_option('_uri')
 | |
|             self._timeout = self.get_option('_timeout')
 | |
|             self._prefix = self.get_option('_prefix')
 | |
|         except KeyError:
 | |
|             # TODO: remove once we no longer support Ansible 2.9
 | |
|             if not ansible_base_version.startswith('2.9.'):
 | |
|                 raise AnsibleError("Do not import CacheModules directly. Use ansible.plugins.loader.cache_loader instead.")
 | |
|             if C.CACHE_PLUGIN_CONNECTION:
 | |
|                 connection = C.CACHE_PLUGIN_CONNECTION.split(',')
 | |
|             self._timeout = C.CACHE_PLUGIN_TIMEOUT
 | |
|             self._prefix = C.CACHE_PLUGIN_PREFIX
 | |
| 
 | |
|         if not HAS_MEMCACHE:
 | |
|             raise AnsibleError("python-memcached is required for the memcached fact cache")
 | |
| 
 | |
|         self._cache = {}
 | |
|         self._db = ProxyClientPool(connection, debug=0)
 | |
|         self._keys = CacheModuleKeys(self._db, self._db.get(CacheModuleKeys.PREFIX) or [])
 | |
| 
 | |
|     def _make_key(self, key):
 | |
|         return "{0}{1}".format(self._prefix, key)
 | |
| 
 | |
|     def _expire_keys(self):
 | |
|         if self._timeout > 0:
 | |
|             expiry_age = time.time() - self._timeout
 | |
|             self._keys.remove_by_timerange(0, expiry_age)
 | |
| 
 | |
|     def get(self, key):
 | |
|         if key not in self._cache:
 | |
|             value = self._db.get(self._make_key(key))
 | |
|             # guard against the key not being removed from the keyset;
 | |
|             # this could happen in cases where the timeout value is changed
 | |
|             # between invocations
 | |
|             if value is None:
 | |
|                 self.delete(key)
 | |
|                 raise KeyError
 | |
|             self._cache[key] = value
 | |
| 
 | |
|         return self._cache.get(key)
 | |
| 
 | |
|     def set(self, key, value):
 | |
|         self._db.set(self._make_key(key), value, time=self._timeout, min_compress_len=1)
 | |
|         self._cache[key] = value
 | |
|         self._keys.add(key)
 | |
| 
 | |
|     def keys(self):
 | |
|         self._expire_keys()
 | |
|         return list(iter(self._keys))
 | |
| 
 | |
|     def contains(self, key):
 | |
|         self._expire_keys()
 | |
|         return key in self._keys
 | |
| 
 | |
|     def delete(self, key):
 | |
|         del self._cache[key]
 | |
|         self._db.delete(self._make_key(key))
 | |
|         self._keys.discard(key)
 | |
| 
 | |
|     def flush(self):
 | |
|         for key in self.keys():
 | |
|             self.delete(key)
 | |
| 
 | |
|     def copy(self):
 | |
|         return self._keys.copy()
 | |
| 
 | |
|     def __getstate__(self):
 | |
|         return dict()
 | |
| 
 | |
|     def __setstate__(self, data):
 | |
|         self.__init__()
 |